@@ -195,8 +195,11 @@ private void observeSyncStream() {
195195 observer .metadata = getMetadata ();
196196 } catch (StatusRuntimeException metaEx ) {
197197 if (fatalStatusCodes .contains (metaEx .getStatus ().getCode ().name ()) && !successfulSync .get ()) {
198- log .debug ("Fatal status code for metadata request: {}, not retrying" , metaEx .getStatus ().getCode ());
199- enqueueFatal (String .format ("Fatal: Failed to connect for metadata request, not retrying for error %s" , metaEx .getStatus ().getCode ()));
198+ log .debug ("Fatal status code for metadata request: {}, not retrying" ,
199+ metaEx .getStatus ().getCode ());
200+ enqueueFatal (String .format (
201+ "Fatal: Failed to connect for metadata request, not retrying for error %s" ,
202+ metaEx .getStatus ().getCode ()));
200203 return ;
201204 } else {
202205 // retry for other status codes
@@ -214,7 +217,9 @@ private void observeSyncStream() {
214217 } catch (StatusRuntimeException ex ) {
215218 if (fatalStatusCodes .contains (ex .getStatus ().getCode ().toString ()) && !successfulSync .get ()) {
216219 log .debug ("Fatal status code during sync stream: {}, not retrying" , ex .getStatus ().getCode ());
217- enqueueFatal (String .format ("Fatal: Failed to connect for metadata request, not retrying for error %s" , ex .getStatus ().getCode ()));
220+ enqueueFatal (String .format (
221+ "Fatal: Failed to connect for metadata request, not retrying for error %s" ,
222+ ex .getStatus ().getCode ()));
218223 return ;
219224 } else {
220225 // retry for other status codes
@@ -289,16 +294,16 @@ private void enqueueError(String message) {
289294 enqueueError (outgoingQueue , message );
290295 }
291296
292- private void enqueueFatal (String message ) {
293- enqueueFatal (outgoingQueue , message );
294- }
295-
296297 private static void enqueueError (BlockingQueue <QueuePayload > queue , String message ) {
297298 if (!queue .offer (new QueuePayload (QueuePayloadType .ERROR , message , null ))) {
298299 log .error ("Failed to convey ERROR status, queue is full" );
299300 }
300301 }
301302
303+ private void enqueueFatal (String message ) {
304+ enqueueFatal (outgoingQueue , message );
305+ }
306+
302307 private static void enqueueFatal (BlockingQueue <QueuePayload > queue , String message ) {
303308 if (!queue .offer (new QueuePayload (QueuePayloadType .FATAL , message , null ))) {
304309 log .error ("Failed to convey FATAL status, queue is full" );
@@ -313,7 +318,8 @@ private static class SyncStreamObserver implements StreamObserver<SyncFlagsRespo
313318
314319 private Struct metadata ;
315320
316- public SyncStreamObserver (BlockingQueue <QueuePayload > outgoingQueue , AtomicBoolean shouldThrottle , List <String > fatalStatusCodes ) {
321+ public SyncStreamObserver (BlockingQueue <QueuePayload > outgoingQueue , AtomicBoolean shouldThrottle ,
322+ List <String > fatalStatusCodes ) {
317323 this .outgoingQueue = outgoingQueue ;
318324 this .shouldThrottle = shouldThrottle ;
319325 this .fatalStatusCodes = fatalStatusCodes ;
0 commit comments