@@ -250,6 +250,7 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
250250 @ Nullable private Boolean pendingMessageCompression ;
251251 @ Nullable private io .grpc .Status cancelledStatus ;
252252 @ Nullable private Metadata cancelledTrailers ;
253+ private final Object lock = new Object ();
253254
254255 KeyAwareClientCall (
255256 KeyAwareChannel parentChannel ,
@@ -262,158 +263,214 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
262263
263264 @ Override
264265 protected ClientCall <RequestT , ResponseT > delegate () {
265- if (delegate == null ) {
266- throw new IllegalStateException (
267- "Delegate call not initialized before use. sendMessage was likely not called." );
266+ synchronized (lock ) {
267+ if (delegate == null ) {
268+ throw new IllegalStateException (
269+ "Delegate call not initialized before use. sendMessage was likely not called." );
270+ }
271+ return delegate ;
268272 }
269- return delegate ;
270273 }
271274
272275 @ Override
273276 public void start (Listener <ResponseT > responseListener , Metadata headers ) {
274- this .responseListener = new KeyAwareClientCallListener <>(responseListener , this );
275- this .headers = headers ;
276- if (this .cancelledStatus != null ) {
277- this .responseListener .onClose (
278- this .cancelledStatus ,
279- this .cancelledTrailers == null ? new Metadata () : this .cancelledTrailers );
277+ Listener <ResponseT > listenerToClose = null ;
278+ io .grpc .Status statusToClose = null ;
279+ Metadata trailersToClose = null ;
280+ synchronized (lock ) {
281+ this .responseListener = new KeyAwareClientCallListener <>(responseListener , this );
282+ this .headers = headers ;
283+ if (this .cancelledStatus != null ) {
284+ listenerToClose = this .responseListener ;
285+ statusToClose = this .cancelledStatus ;
286+ trailersToClose =
287+ this .cancelledTrailers == null ? new Metadata () : this .cancelledTrailers ;
288+ }
289+ }
290+ if (listenerToClose != null ) {
291+ listenerToClose .onClose (statusToClose , trailersToClose );
280292 }
281293 }
282294
283295 @ Override
284296 @ SuppressWarnings ("unchecked" )
285297 public void sendMessage (RequestT message ) {
286- if (this .cancelledStatus != null ) {
287- return ;
288- }
289- if (responseListener == null || headers == null ) {
290- throw new IllegalStateException ("start must be called before sendMessage" );
291- }
292- ChannelEndpoint endpoint = null ;
293- ChannelFinder finder = null ;
294-
295- if (message instanceof ReadRequest ) {
296- ReadRequest .Builder reqBuilder = ((ReadRequest ) message ).toBuilder ();
297- RoutingDecision routing = routeFromRequest (reqBuilder );
298- finder = routing .finder ;
299- endpoint = routing .endpoint ;
300- message = (RequestT ) reqBuilder .build ();
301- } else if (message instanceof ExecuteSqlRequest ) {
302- ExecuteSqlRequest .Builder reqBuilder = ((ExecuteSqlRequest ) message ).toBuilder ();
303- RoutingDecision routing = routeFromRequest (reqBuilder );
304- finder = routing .finder ;
305- endpoint = routing .endpoint ;
306- message = (RequestT ) reqBuilder .build ();
307- } else if (message instanceof BeginTransactionRequest ) {
308- BeginTransactionRequest .Builder reqBuilder =
309- ((BeginTransactionRequest ) message ).toBuilder ();
310- String databaseId = parentChannel .extractDatabaseIdFromSession (reqBuilder .getSession ());
311- if (databaseId != null && reqBuilder .hasMutationKey ()) {
312- finder = parentChannel .getOrCreateChannelFinder (databaseId );
313- endpoint = finder .findServer (reqBuilder );
298+ synchronized (lock ) {
299+ if (this .cancelledStatus != null ) {
300+ return ;
314301 }
315- allowDefaultAffinity = true ;
316- message = (RequestT ) reqBuilder .build ();
317- } else if (message instanceof CommitRequest ) {
318- CommitRequest request = (CommitRequest ) message ;
319- if (!request .getTransactionId ().isEmpty ()) {
320- endpoint = parentChannel .affinityEndpoint (request .getTransactionId ());
321- transactionIdToClear = request .getTransactionId ();
302+ if (responseListener == null || headers == null ) {
303+ throw new IllegalStateException ("start must be called before sendMessage" );
322304 }
323- } else if (message instanceof RollbackRequest ) {
324- RollbackRequest request = (RollbackRequest ) message ;
325- if (!request .getTransactionId ().isEmpty ()) {
326- endpoint = parentChannel .affinityEndpoint (request .getTransactionId ());
327- transactionIdToClear = request .getTransactionId ();
305+ ChannelEndpoint endpoint = null ;
306+ ChannelFinder finder = null ;
307+
308+ if (message instanceof ReadRequest ) {
309+ ReadRequest .Builder reqBuilder = ((ReadRequest ) message ).toBuilder ();
310+ RoutingDecision routing = routeFromRequest (reqBuilder );
311+ finder = routing .finder ;
312+ endpoint = routing .endpoint ;
313+ message = (RequestT ) reqBuilder .build ();
314+ } else if (message instanceof ExecuteSqlRequest ) {
315+ ExecuteSqlRequest .Builder reqBuilder = ((ExecuteSqlRequest ) message ).toBuilder ();
316+ RoutingDecision routing = routeFromRequest (reqBuilder );
317+ finder = routing .finder ;
318+ endpoint = routing .endpoint ;
319+ message = (RequestT ) reqBuilder .build ();
320+ } else if (message instanceof BeginTransactionRequest ) {
321+ BeginTransactionRequest .Builder reqBuilder =
322+ ((BeginTransactionRequest ) message ).toBuilder ();
323+ String databaseId = parentChannel .extractDatabaseIdFromSession (reqBuilder .getSession ());
324+ if (databaseId != null && reqBuilder .hasMutationKey ()) {
325+ finder = parentChannel .getOrCreateChannelFinder (databaseId );
326+ endpoint = finder .findServer (reqBuilder );
327+ }
328+ allowDefaultAffinity = true ;
329+ message = (RequestT ) reqBuilder .build ();
330+ } else if (message instanceof CommitRequest ) {
331+ CommitRequest request = (CommitRequest ) message ;
332+ if (!request .getTransactionId ().isEmpty ()) {
333+ endpoint = parentChannel .affinityEndpoint (request .getTransactionId ());
334+ transactionIdToClear = request .getTransactionId ();
335+ }
336+ } else if (message instanceof RollbackRequest ) {
337+ RollbackRequest request = (RollbackRequest ) message ;
338+ if (!request .getTransactionId ().isEmpty ()) {
339+ endpoint = parentChannel .affinityEndpoint (request .getTransactionId ());
340+ transactionIdToClear = request .getTransactionId ();
341+ }
342+ } else {
343+ throw new IllegalStateException (
344+ "Only read, query, begin transaction, commit, and rollback requests are supported for"
345+ + " key-aware calls." );
328346 }
329- } else {
330- throw new IllegalStateException (
331- "Only read, query, begin transaction, commit, and rollback requests are supported for"
332- + " key-aware calls." );
333- }
334347
335- if (endpoint == null ) {
336- endpoint = parentChannel .endpointCache .defaultChannel ();
337- }
338- selectedEndpoint = endpoint ;
339- this .channelFinder = finder ;
348+ if (endpoint == null ) {
349+ endpoint = parentChannel .endpointCache .defaultChannel ();
350+ }
351+ selectedEndpoint = endpoint ;
352+ this .channelFinder = finder ;
340353
341- delegate = endpoint .getChannel ().newCall (methodDescriptor , callOptions );
342- if (pendingMessageCompression != null ) {
343- delegate .setMessageCompression (pendingMessageCompression );
344- pendingMessageCompression = null ;
345- }
346- delegate .start (responseListener , headers );
347- drainPendingRequests ();
348- delegate .sendMessage (message );
349- if (pendingHalfClose ) {
350- delegate .halfClose ();
354+ delegate = endpoint .getChannel ().newCall (methodDescriptor , callOptions );
355+ if (pendingMessageCompression != null ) {
356+ delegate .setMessageCompression (pendingMessageCompression );
357+ pendingMessageCompression = null ;
358+ }
359+ delegate .start (responseListener , headers );
360+ drainPendingRequests ();
361+ delegate .sendMessage (message );
362+ if (pendingHalfClose ) {
363+ delegate .halfClose ();
364+ }
351365 }
352366 }
353367
354368 @ Override
355369 public void halfClose () {
356- if (delegate != null ) {
357- delegate .halfClose ();
358- } else {
359- pendingHalfClose = true ;
370+ ClientCall <RequestT , ResponseT > currentDelegate ;
371+ synchronized (lock ) {
372+ if (this .cancelledStatus != null ) {
373+ return ;
374+ }
375+ if (delegate == null ) {
376+ pendingHalfClose = true ;
377+ return ;
378+ }
379+ currentDelegate = delegate ;
360380 }
381+ currentDelegate .halfClose ();
361382 }
362383
363384 @ Override
364385 public void cancel (@ Nullable String message , @ Nullable Throwable cause ) {
365- if (delegate != null ) {
366- delegate .cancel (message , cause );
367- } else {
368- cancelledStatus = io .grpc .Status .CANCELLED .withDescription (message ).withCause (cause );
369- Metadata trailers =
370- cause == null ? new Metadata () : io .grpc .Status .trailersFromThrowable (cause );
371- cancelledTrailers = trailers == null ? new Metadata () : trailers ;
372- if (responseListener != null ) {
373- responseListener .onClose (cancelledStatus , cancelledTrailers );
386+ ClientCall <RequestT , ResponseT > currentDelegate ;
387+ Listener <ResponseT > listenerToClose = null ;
388+ io .grpc .Status statusToClose = null ;
389+ Metadata trailersToClose = null ;
390+ synchronized (lock ) {
391+ currentDelegate = delegate ;
392+ if (currentDelegate == null ) {
393+ cancelledStatus = io .grpc .Status .CANCELLED .withDescription (message ).withCause (cause );
394+ Metadata trailers =
395+ cause == null ? new Metadata () : io .grpc .Status .trailersFromThrowable (cause );
396+ cancelledTrailers = trailers == null ? new Metadata () : trailers ;
397+ if (responseListener != null ) {
398+ listenerToClose = responseListener ;
399+ statusToClose = cancelledStatus ;
400+ trailersToClose = cancelledTrailers ;
401+ }
374402 }
375403 }
404+ if (currentDelegate != null ) {
405+ currentDelegate .cancel (message , cause );
406+ } else if (listenerToClose != null ) {
407+ listenerToClose .onClose (statusToClose , trailersToClose );
408+ }
376409 }
377410
378411 @ Override
379412 public void request (int numMessages ) {
380- if (delegate != null ) {
381- delegate .request (numMessages );
382- return ;
383- }
384- if (numMessages <= 0 ) {
385- return ;
386- }
387- long updated = pendingRequests + numMessages ;
388- if (updated < 0L ) {
389- updated = Long .MAX_VALUE ;
413+ ClientCall <RequestT , ResponseT > currentDelegate ;
414+ synchronized (lock ) {
415+ if (cancelledStatus != null ) {
416+ return ;
417+ }
418+ if (delegate != null ) {
419+ currentDelegate = delegate ;
420+ } else {
421+ if (numMessages <= 0 ) {
422+ return ;
423+ }
424+ long updated = pendingRequests + numMessages ;
425+ if (updated < 0L ) {
426+ updated = Long .MAX_VALUE ;
427+ }
428+ pendingRequests = updated ;
429+ return ;
430+ }
390431 }
391- pendingRequests = updated ;
432+ currentDelegate . request ( numMessages ) ;
392433 }
393434
394435 @ Override
395436 public boolean isReady () {
396- if (delegate == null ) {
437+ ClientCall <RequestT , ResponseT > currentDelegate ;
438+ synchronized (lock ) {
439+ currentDelegate = delegate ;
440+ }
441+ if (currentDelegate == null ) {
397442 return false ;
398443 }
399- return delegate .isReady ();
444+ return currentDelegate .isReady ();
400445 }
401446
402447 @ Override
403448 public void setMessageCompression (boolean enabled ) {
404- if (delegate != null ) {
405- delegate .setMessageCompression (enabled );
406- } else {
407- pendingMessageCompression = enabled ;
449+ ClientCall <RequestT , ResponseT > currentDelegate ;
450+ synchronized (lock ) {
451+ if (cancelledStatus != null ) {
452+ return ;
453+ }
454+ if (delegate != null ) {
455+ currentDelegate = delegate ;
456+ } else {
457+ pendingMessageCompression = enabled ;
458+ return ;
459+ }
408460 }
461+ currentDelegate .setMessageCompression (enabled );
409462 }
410463
411464 private void drainPendingRequests () {
465+ ClientCall <RequestT , ResponseT > currentDelegate = delegate ;
466+ if (currentDelegate == null ) {
467+ return ;
468+ }
412469 long requests = pendingRequests ;
413470 pendingRequests = 0L ;
414471 while (requests > 0 ) {
415472 int batch = requests > Integer .MAX_VALUE ? Integer .MAX_VALUE : (int ) requests ;
416- delegate .request (batch );
473+ currentDelegate .request (batch );
417474 requests -= batch ;
418475 }
419476 }
0 commit comments