Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/cacher: dispatchEvents use progressRequester #124754

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 16 additions & 23 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,23 @@ func (c *Cacher) dispatchEvents() {
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
defer bookmarkTimer.Stop()

// The internal informer populates the RV as soon as it conducts
// The first successful sync with the underlying store.
// The cache must wait until this first sync is completed to be deemed ready.
// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0,
// we poll aggressively for the first RV before entering the dispatch loop.
lastProcessedResourceVersion := uint64(0)
if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) {
if rv := c.watchCache.getResourceVersion(); rv != 0 {
lastProcessedResourceVersion = rv
return true, nil
}
return false, nil
}); err != nil {
// given the function above never returns error,
// the non-empty error means that the stopCh was closed
return
}
for {
select {
case event, ok := <-c.incoming:
Expand All @@ -929,29 +945,6 @@ func (c *Cacher) dispatchEvents() {
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
//
// Just pop closed watchers and requeue others if needed.
//
// TODO(#115478): rework the following logic
// in a way that would allow more
// efficient cleanup of closed watchers
if lastProcessedResourceVersion == 0 {
func() {
c.Lock()
defer c.Unlock()
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
for _, watcher := range watchers {
if watcher.stopped {
continue
}
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
}
}
}()
continue
}
bookmarkEvent := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1902,16 +1902,13 @@ func BenchmarkCacher_GetList(b *testing.B) {
}
}

// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
// a bookmark event will be delivered after the cacher has seen an event.
// Previously the watchers have been removed from the "want bookmark" queue.
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
// TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived makes sure that
// a bookmark event will be delivered even if the cacher has not received an event.
func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
require.NoError(t, err, "failed to create cacher")
defer cacher.Stop()

// wait until cacher is initialized.
Expand All @@ -1929,29 +1926,10 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()

// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
// We do that every ~1s, so waiting 2 seconds seems enough.
time.Sleep(2 * time.Second)

// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
makePod := func(rv uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
err = cacher.watchCache.Add(makePod(102))
require.NoError(t, err)

verifyEvents(t, w, []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
ResourceVersion: "100",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,12 @@ func (w *watchCache) Resync() error {
return nil
}

func (w *watchCache) getResourceVersion() uint64 {
w.RLock()
defer w.RUnlock()
return w.resourceVersion
}

func (w *watchCache) currentCapacity() int {
w.RLock()
defer w.RUnlock()
Expand Down