@@ -553,6 +553,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
553553 }
554554
555555 listRetry .Steps = int (c .listRetryLimit )
556+ opts .ResourceVersion = "0"
556557 err := retry .OnError (listRetry , c .listRetryFunc , func () error {
557558 var ierr error
558559 res , ierr = resClient .List (ctx , opts )
@@ -605,6 +606,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
605606}
606607
607608func (c * clusterCache ) watchEvents (ctx context.Context , api kube.APIResourceInfo , resClient dynamic.ResourceInterface , ns string , resourceVersion string ) {
609+ timeoutSeconds := int64 (c .watchResyncTimeout .Seconds ())
608610 kube .RetryUntilSucceed (ctx , watchResourcesRetryTimeout , fmt .Sprintf ("watch %s on %s" , api .GroupKind , c .config .Host ), c .log , func () (err error ) {
609611 defer func () {
610612 if r := recover (); r != nil {
@@ -622,6 +624,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
622624
623625 w , err := watchutil .NewRetryWatcher (resourceVersion , & cache.ListWatch {
624626 WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
627+ options .TimeoutSeconds = & timeoutSeconds
625628 res , err := resClient .Watch (ctx , options )
626629 if errors .IsNotFound (err ) {
627630 c .stopWatching (api .GroupKind , ns )
@@ -633,30 +636,17 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
633636 return err
634637 }
635638
636- defer func () {
637- w .Stop ()
638- resourceVersion = ""
639- }()
640-
641- var watchResyncTimeoutCh <- chan time.Time
642- if c .watchResyncTimeout > 0 {
643- shouldResync := time .NewTimer (c .watchResyncTimeout )
644- defer shouldResync .Stop ()
645- watchResyncTimeoutCh = shouldResync .C
646- }
639+ defer w .Stop ()
647640
648641 for {
649642 select {
650643 // stop watching when parent context got cancelled
651644 case <- ctx .Done ():
652645 return nil
653646
654- // re-synchronize API state and restart watch periodically
655- case <- watchResyncTimeoutCh :
656- return fmt .Errorf ("Resyncing %s on %s due to timeout" , api .GroupKind , c .config .Host )
657-
658647 // re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
659648 case <- w .Done ():
649+ resourceVersion = ""
660650 return fmt .Errorf ("Watch %s on %s has closed" , api .GroupKind , c .config .Host )
661651
662652 case event , ok := <- w .ResultChan ():
@@ -666,8 +656,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666656
667657 obj , ok := event .Object .(* unstructured.Unstructured )
668658 if ! ok {
659+ resourceVersion = ""
669660 return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670661 }
662+ resourceVersion = obj .GetResourceVersion ()
671663
672664 c .processEvent (event .Type , obj )
673665 if kube .IsCRD (obj ) {
0 commit comments