Skip to content

Commit 346f977

Browse files
authored
Make sure all existing objects are announced when a watch is created (#89)
#88 Signed-off-by: Karol Zadora-Przylecki <[email protected]>
1 parent 5e16fbe commit 346f977

File tree

3 files changed

+130
-23
lines changed

3 files changed

+130
-23
lines changed

pkg/storage/filepath/jsonfile_rest.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -549,33 +549,40 @@ func (f *filepathREST) Watch(ctx context.Context, options *metainternalversion.L
549549
p := newSelectionPredicate(options)
550550
jw := f.watchSet.newWatch()
551551

552-
// On initial watch, send all the existing objects
553-
list, err := f.List(ctx, options)
554-
if err != nil {
555-
return nil, err
556-
}
557-
558-
danger := reflect.ValueOf(list).Elem()
559-
items := danger.FieldByName("Items")
560-
561-
initEvents := []watch.Event{}
562-
for i := 0; i < items.Len(); i++ {
563-
obj := items.Index(i).Addr().Interface().(runtime.Object)
564-
ok, err := p.Matches(obj)
552+
getInitEvents := func() ([]watch.Event, error) {
553+
// On initial watch, send all the existing objects.
554+
// We may receive duplicated "Added" events for some objects via the watch updata channel,
555+
// and we may report them twice, but that is much better than not reporting them at all,
556+
// and having clients left unaware that an object they might be interested in was created.
557+
// See https://github.com/tilt-dev/tilt-apiserver/issues/88
558+
559+
list, err := f.List(ctx, options)
565560
if err != nil {
566561
return nil, err
567562
}
568-
if !ok {
569-
continue
563+
danger := reflect.ValueOf(list).Elem()
564+
items := danger.FieldByName("Items")
565+
566+
initEvents := []watch.Event{}
567+
for i := 0; i < items.Len(); i++ {
568+
obj := items.Index(i).Addr().Interface().(runtime.Object)
569+
ok, err := p.Matches(obj)
570+
if err != nil {
571+
return nil, err
572+
}
573+
if !ok {
574+
continue
575+
}
576+
initEvents = append(initEvents, watch.Event{
577+
Type: watch.Added,
578+
Object: obj,
579+
})
570580
}
571-
initEvents = append(initEvents, watch.Event{
572-
Type: watch.Added,
573-
Object: obj,
574-
})
581+
return initEvents, nil
575582
}
576-
jw.Start(p, initEvents)
577583

578-
return jw, nil
584+
startErr := jw.Start(p, getInitEvents)
585+
return jw, startErr
579586
}
580587

581588
func (f *filepathREST) conflictErr(name string) error {

pkg/storage/filepath/jsonfile_rest_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/apimachinery/pkg/runtime"
2020
"k8s.io/apimachinery/pkg/runtime/schema"
2121
"k8s.io/apimachinery/pkg/runtime/serializer"
22+
"k8s.io/apimachinery/pkg/util/wait"
2223
"k8s.io/apimachinery/pkg/watch"
2324
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
2425
"k8s.io/apiserver/pkg/registry/generic"
@@ -315,6 +316,99 @@ func TestFilepathREST_UpdateIdentical(t *testing.T) {
315316
require.Equal(t, "2", actual.ResourceVersion)
316317
}
317318

319+
func TestFilepathREST_ParallelCreateAndWatch(t *testing.T) {
320+
f := newRESTFixture(t)
321+
defer f.tearDown()
322+
323+
// Record unique object creation events
324+
created := sync.Map{}
325+
326+
eventRecorder := func() {
327+
sendInitialEvents := true
328+
w, werr := f.watcher().Watch(f.rootCtx, &metainternalversion.ListOptions{
329+
Watch: true,
330+
SendInitialEvents: &sendInitialEvents,
331+
})
332+
require.NoError(t, werr)
333+
defer w.Stop()
334+
335+
for {
336+
select {
337+
case <-f.rootCtx.Done():
338+
return
339+
case e, isOpen := <-w.ResultChan():
340+
if !isOpen {
341+
return
342+
}
343+
if e.Type == watch.Added {
344+
created.Store(e.Object.(metav1.Object).GetName(), true)
345+
}
346+
}
347+
}
348+
}
349+
350+
// Spawn a bunch of workers that create objects
351+
const TotalObjects = 500
352+
const Workers = 20
353+
const objNamePrefix = "parallel-create-and-watch-"
354+
var wready, wdone sync.WaitGroup
355+
wready.Add(Workers)
356+
wdone.Add(Workers)
357+
objCreator := f.creater()
358+
start := make(chan struct{})
359+
360+
for i := 0; i < Workers; i++ {
361+
go func(worker int) {
362+
wready.Done()
363+
<-start // Start all workers at the same time
364+
365+
objectsToCreate := TotalObjects / Workers
366+
367+
for j := 0; j < objectsToCreate; j++ {
368+
// When the first worker is in the middle of creating their objects,
369+
// start the watcher/event counter.
370+
if worker == 0 && j == objectsToCreate/2 {
371+
go eventRecorder()
372+
}
373+
374+
objName := fmt.Sprintf("%s-%d-%d", objNamePrefix, worker, j)
375+
obj := &v1alpha1.Manifest{
376+
ObjectMeta: metav1.ObjectMeta{
377+
Name: objName,
378+
},
379+
Spec: v1alpha1.ManifestSpec{
380+
Message: objName,
381+
},
382+
}
383+
_, err := objCreator.Create(f.rootCtx, obj, nil, nil)
384+
require.NoError(t, err)
385+
}
386+
387+
wdone.Done()
388+
}(i)
389+
}
390+
391+
wready.Wait()
392+
close(start)
393+
wdone.Wait()
394+
395+
// Wait for all Added events for the objects we created
396+
count := 0
397+
waitErr := wait.PollUntilContextTimeout(f.rootCtx, 200*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
398+
count = 0
399+
created.Range(func(key, value interface{}) bool {
400+
count++
401+
return true
402+
})
403+
404+
if count == TotalObjects {
405+
return true, nil
406+
}
407+
return false, nil
408+
})
409+
require.NoError(t, waitErr, "Did not receive expected number of Added events (received %d, expected %d)", count, TotalObjects)
410+
}
411+
318412
type restOptionsGetter struct {
319413
codec runtime.Codec
320414
}

pkg/storage/filepath/watchset.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ type watchNode struct {
5353
}
5454

5555
// Start sending events to this watch.
56-
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) {
56+
func (w *watchNode) Start(p storage.SelectionPredicate, initEventFactory func() ([]watch.Event, error)) error {
5757
w.s.mu.Lock()
58+
defer w.s.mu.Unlock()
5859
w.s.nodes[w.id] = w
59-
w.s.mu.Unlock()
60+
initEvents, err := initEventFactory()
61+
if err != nil {
62+
return err
63+
}
6064

6165
go func() {
6266
for _, e := range initEvents {
@@ -76,6 +80,8 @@ func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event
7680
}
7781
close(w.outCh)
7882
}()
83+
84+
return nil
7985
}
8086

8187
func (w *watchNode) Stop() {

0 commit comments

Comments
 (0)