@@ -309,7 +309,12 @@ class StreamingSyncImplementation implements StreamingSync {
309309 }
310310
311311 Future <void > _rustStreamingSyncIteration () async {
312- await _ActiveRustStreamingIteration (this ).syncIteration ();
312+ logger.info ('Starting Rust sync iteration' );
313+ final response = await _ActiveRustStreamingIteration (this ).syncIteration ();
314+ logger.info (
315+ 'Ending Rust sync iteration. Immediate restart: ${response .immediateRestart }' );
316+ // Note: With the current loop in streamingSync(), any return value that
317+ // isn't an exception triggers an immediate restart.
313318 }
314319
315320 Future <(List <BucketRequest >, Map <String , BucketDescription ?>)>
@@ -610,7 +615,7 @@ final class _ActiveRustStreamingIteration {
610615 var _hadSyncLine = false ;
611616
612617 StreamSubscription <void >? _completedUploads;
613- final Completer <void > _completedStream = Completer ();
618+ final Completer <RustSyncIterationResult > _completedStream = Completer ();
614619
615620 _ActiveRustStreamingIteration (this .sync );
616621
@@ -620,7 +625,7 @@ final class _ActiveRustStreamingIteration {
620625 .toList ();
621626 }
622627
623- Future <void > syncIteration () async {
628+ Future <RustSyncIterationResult > syncIteration () async {
624629 try {
625630 await _control (
626631 'start' ,
@@ -632,7 +637,7 @@ final class _ActiveRustStreamingIteration {
632637 }),
633638 );
634639 assert (_completedStream.isCompleted, 'Should have started streaming' );
635- await _completedStream.future;
640+ return await _completedStream.future;
636641 } finally {
637642 _isActive = false ;
638643 _completedUploads? .cancel ();
@@ -655,10 +660,12 @@ final class _ActiveRustStreamingIteration {
655660 }).map (ReceivedLine .new );
656661 }
657662
658- Future <void > _handleLines (EstablishSyncStream request) async {
663+ Future <RustSyncIterationResult > _handleLines (
664+ EstablishSyncStream request) async {
659665 final events = addBroadcast (
660666 _receiveLines (request.request), sync ._nonLineSyncEvents.stream);
661667
668+ var needsImmediateRestart = false ;
662669 loop:
663670 await for (final event in events) {
664671 if (! _isActive || sync .aborted) {
@@ -674,7 +681,8 @@ final class _ActiveRustStreamingIteration {
674681 await _control ('line_text' , line);
675682 case UploadCompleted ():
676683 await _control ('completed_upload' );
677- case AbortCurrentIteration ():
684+ case AbortCurrentIteration (: final hideDisconnectState):
685+ needsImmediateRestart = hideDisconnectState;
678686 break loop;
679687 case TokenRefreshComplete ():
680688 await _control ('refreshed_token' );
@@ -683,6 +691,8 @@ final class _ActiveRustStreamingIteration {
683691 convert.json.encode (_encodeSubscriptions (currentSubscriptions)));
684692 }
685693 }
694+
695+ return (immediateRestart: needsImmediateRestart);
686696 }
687697
688698 /// Triggers a local CRUD upload when the first sync line has been received.
@@ -736,10 +746,11 @@ final class _ActiveRustStreamingIteration {
736746 sync .logger.warning ('Could not prefetch credentials' , e, s);
737747 });
738748 }
739- case CloseSyncStream ():
749+ case CloseSyncStream (: final hideDisconnect ):
740750 if (! sync .aborted) {
741751 _isActive = false ;
742- sync ._nonLineSyncEvents.add (const AbortCurrentIteration ());
752+ sync ._nonLineSyncEvents
753+ .add (AbortCurrentIteration (hideDisconnectState: hideDisconnect));
743754 }
744755 case FlushFileSystem ():
745756 await sync .adapter.flushFileSystem ();
@@ -751,6 +762,8 @@ final class _ActiveRustStreamingIteration {
751762 }
752763}
753764
765+ typedef RustSyncIterationResult = ({bool immediateRestart});
766+
754767sealed class SyncEvent {}
755768
756769final class ReceivedLine implements SyncEvent {
@@ -768,7 +781,14 @@ final class TokenRefreshComplete implements SyncEvent {
768781}
769782
770783final class AbortCurrentIteration implements SyncEvent {
771- const AbortCurrentIteration ();
784+ /// Whether we should immediately disconnect and hide the `disconnected`
785+ /// state.
786+ ///
787+ /// This is used when we're changing subscription, to hide the brief downtime
788+ /// we have while reconnecting.
789+ final bool hideDisconnectState;
790+
791+ const AbortCurrentIteration ({this .hideDisconnectState = false });
772792}
773793
774794final class HandleChangedSubscriptions implements SyncEvent {
0 commit comments