@@ -587,6 +587,59 @@ public void clientCancel() throws Exception {
587587 verify (mockClientStreamListener , never ()).closed (any (Status .class ), any (Metadata .class ));
588588 }
589589
590+ @ Test (timeout = 5000 )
591+ public void clientCancelFromWithinMessageRead () throws Exception {
592+ server .start (serverListener );
593+ client .start (mockClientTransportListener );
594+ MockServerTransportListener serverTransportListener
595+ = serverListener .takeListenerOrFail (TIMEOUT_MS , TimeUnit .MILLISECONDS );
596+ serverTransport = serverTransportListener .transport ;
597+
598+ final SettableFuture <Boolean > closedCalled = SettableFuture .create ();
599+ final ClientStream clientStream = client .newStream (methodDescriptor , new Metadata ());
600+ clientStream .start (new ClientStreamListener () {
601+ @ Override
602+ public void headersRead (Metadata headers ) {
603+ }
604+
605+ @ Override
606+ public void closed (Status status , Metadata trailers ) {
607+ assertEquals (Status .CANCELLED .getCode (), status .getCode ());
608+ assertEquals ("nevermind" , status .getDescription ());
609+ closedCalled .set (true );
610+ }
611+
612+ @ Override
613+ public void messageRead (InputStream message ) {
614+ assertEquals ("foo" , methodDescriptor .parseResponse (message ));
615+ clientStream .cancel (Status .CANCELLED .withDescription ("nevermind" ));
616+ }
617+
618+ @ Override
619+ public void onReady () {
620+ }
621+ });
622+ clientStream .halfClose ();
623+ clientStream .request (1 );
624+
625+ StreamCreation serverStreamCreation
626+ = serverTransportListener .takeStreamOrFail (TIMEOUT_MS , TimeUnit .MILLISECONDS );
627+ assertEquals (methodDescriptor .getFullMethodName (), serverStreamCreation .method );
628+ ServerStream serverStream = serverStreamCreation .stream ;
629+ ServerStreamListener mockServerStreamListener = serverStreamCreation .listener ;
630+ verify (mockServerStreamListener , timeout (TIMEOUT_MS )).onReady ();
631+
632+ assertTrue (serverStream .isReady ());
633+ serverStream .writeHeaders (new Metadata ());
634+ serverStream .writeMessage (methodDescriptor .streamRequest ("foo" ));
635+ serverStream .flush ();
636+
637+ // Block until closedCalled was set.
638+ closedCalled .get ();
639+
640+ serverStream .close (Status .OK , new Metadata ());
641+ }
642+
590643 @ Test
591644 public void serverCancel () throws Exception {
592645 server .start (serverListener );
0 commit comments