New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage/cacher: dispatchEvents use progressRequester #124754
base: master
Are you sure you want to change the base?
storage/cacher: dispatchEvents use progressRequester #124754
Conversation
This issue is currently awaiting triage. If a SIG or subproject determines this is a relevant issue, they will accept it by applying the The Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
// TODO(p0lyn0mial): adapt the following logic once | ||
// https://github.com/kubernetes/kubernetes/pull/124612 merges | ||
progressRequesterCleanUpOnceFn := func() { /*no-op*/ } | ||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should actually gate the progressRequester only when the etcd version "matches" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - we should gate on DefaultFeatureSupportsChecker
once that merges.
The problem there is that it may not yet be initialized... and we need to handle that case too, so it's a bit more tricky (because we are not able to differentiate between not-initialized and not-supports really).
@serathius - FYI
[On a related note, we should probably reject streaming list requests if progress-requester is not supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we are not able to differentiate between not-initialized and not-supports really
why ? It is not supported when the version of etcd doesn't match.
On a related note, we should probably reject streaming list requests if progress-requester is not supported.
yeah, ideally if we could add etcdVersionChecker
to
kubernetes/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go
Line 28 in 2ae115e
func ValidateListOptions(options *internalversion.ListOptions, isWatchListFeatureEnabled bool) field.ErrorList { |
could the etcdVersionChecker
gate the server readiness until it initialises ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ? It is not supported when the version of etcd doesn't match.
etcd can be started after kube-apiserver, so we default to false and let initialization switch it
could the etcdVersionChecker gate the server readiness until it initialises ?
no - because we don't really know when it is fully initialized...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
etcd can be started after kube-apiserver, so we default to false and let initialization switch it
In that case the server won't be ready until newETCD3Check turns green. We could create something similar for the version checker.
@@ -2470,6 +2470,72 @@ func TestWatchStreamSeparation(t *testing.T) { | |||
} | |||
} | |||
|
|||
func TestDispatchEventsUseProgressRequester(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test works but can start flaking on "timing" issues. I like it because it test the entire watch request.
require.NoError(t, err, "failed to create watch: %v") | ||
testCheckNoEvents(t, w) | ||
w.Stop() | ||
storeWatchProgressCounterValueAfterFirstWatch := backingStorage.getRequestWatchProgressCounter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe before stopping the watch we should loop until the counter will be > 0 ? That could deflake the test
/assign @wojtek-t |
// https://github.com/kubernetes/kubernetes/pull/124612 merges | ||
progressRequesterCleanUpOnceFn := func() { /*no-op*/ } | ||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { | ||
progressRequester.Add() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just also realised than we need to slightly change the progressRequester
so that it is able to send periodic progress updates. This will unblock watchers initialised from the global RV against resources that haven't received any changes/updates.
I can try changing the progressRequester
for that purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is what should we do when the etcd was started with the progress notification flag ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, no, this will be handled by
c.watchCache.waitingUntilFresh.Add() |
b0abfde
to
5d2e035
Compare
progressRequester.Add() | ||
progressRequesterCleanUpOnceFn = sync.OnceFunc(progressRequester.Remove) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with all discussion on the other PRs, I think it will actually be much easier to proceed with a different approach (the one that I thought about originally).
Basically, instead of even requesting bookmarks, ensure that we initialize lastProcessedResourceVersion
correctly from the beginning.
So the flow we want to achieve is that lastProcessedResourceVersion
will actually be set when the first List call is done.
Now, synchronizing that correctly in arbitrary way is a bit tricky, but we have a pretty simple path to achieve it.
What you just need to do is very simple thing, just add here:
lastProcessedResourceVersion := uint64(0)
PollUntil(10ms, func() (bool, error) {
if rv := c.watchCache.GetResourceVersion(); rv != 0 {
lastProcessedResourceVersion = rv
return true, nil
}
return false, nil
}
That solves the whole problem - you don't need any other changes in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i like the idea it will be detached from the progressRequester
. thanks!
fc80b5e
to
064fe38
Compare
/remove-sig api-machinery |
// The cache must wait until this first sync is completed to be deemed ready. | ||
// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0, | ||
// we poll aggressively for the first RV before entering the dispatch loop. | ||
if err := c.ready.wait(wait.ContextForChannel(c.stopCh)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it? I suggest removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have an extremely tight loop below.
Before hammering the CPU, we should check if the cacher has been synchronised so that we can read the current value of the resource version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the calls in that function (GetResourceVersion) are pretty cheap, so I wouldn't unnecessarily complicate it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is your concern here? Does calling a well-known function really make this code more complicated ?:)
There is no point in calling getResourceVersion
before the cacher synchronies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not-readiness can happen later too, if the cache unsynchronizes later it can block again.
But primarily - complexity is my concern. This code is already super complicated and we need to find ways for making it simpler.
Calling getResourceVersion()
is really cheap and I don't bother calling it every 10ms until it initializes - the cost of doing that is negligible compared to initializations itself anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not-readiness can happen later too, if the cache unsynchronizes later it can block again.
we call it only once and then we enter the for
loop from which we never exit (unless the stopCh was closed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But primarily - complexity is my concern. This code is already super complicated and we need to find ways for making it simpler.
OK, pushed, PTAL.
@@ -641,6 +641,12 @@ func (w *watchCache) Resync() error { | |||
return nil | |||
} | |||
|
|||
func (w *watchCache) GetResourceVersion() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make it private function
} | ||
return false, nil | ||
}); err != nil { | ||
return /*since it can only happen when the stopCh is closed*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// Given the function above never returns error, it can happen only on stopCh being closed
```
064fe38
to
783cf82
Compare
783cf82
to
33f81ee
Compare
/kind feature /lgtm |
LGTM label has been added. Git tree hash: a2c5781dceeeb6ff2dfb380c19bc8908d7fbf891
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: p0lyn0mial, wojtek-t The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@p0lyn0mial: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
/test pull-kubernetes-e2e-kind-ipv6 |
What type of PR is this?
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: