File tree 2 files changed +28
-1
lines changed
main/java/org/apache/kafka/clients/consumer/internals
test/java/org/apache/kafka/clients/consumer/internals
2 files changed +28
-1
lines changed Original file line number Diff line number Diff line change @@ -59,6 +59,7 @@ public class CoordinatorRequestManager implements RequestManager {
59
59
private final RequestState coordinatorRequestState ;
60
60
private long timeMarkedUnknownMs = -1L ; // starting logging a warning only after unable to connect for a while
61
61
private long totalDisconnectedMin = 0 ;
62
+ private boolean closing = false ;
62
63
private Node coordinator ;
63
64
64
65
public CoordinatorRequestManager (
@@ -92,7 +93,7 @@ public CoordinatorRequestManager(
92
93
*/
93
94
@ Override
94
95
public NetworkClientDelegate .PollResult poll (final long currentTimeMs ) {
95
- if (this .coordinator != null )
96
+ if (closing || this .coordinator != null )
96
97
return EMPTY ;
97
98
98
99
if (coordinatorRequestState .canSendRequest (currentTimeMs )) {
@@ -103,6 +104,12 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
103
104
return new NetworkClientDelegate .PollResult (coordinatorRequestState .remainingBackoffMs (currentTimeMs ));
104
105
}
105
106
107
+ @ Override
108
+ public NetworkClientDelegate .PollResult pollOnClose (long currentTimeMs ) {
109
+ closing = true ;
110
+ return EMPTY ;
111
+ }
112
+
106
113
NetworkClientDelegate .UnsentRequest makeFindCoordinatorRequest (final long currentTimeMs ) {
107
114
coordinatorRequestState .onSendAttempt (currentTimeMs );
108
115
FindCoordinatorRequestData data = new FindCoordinatorRequestData ()
Original file line number Diff line number Diff line change @@ -254,6 +254,26 @@ public void testNetworkTimeout() {
254
254
assertEquals (1 , res2 .unsentRequests .size ());
255
255
}
256
256
257
+ @ Test
258
+ public void testSignalOnClose () {
259
+ CoordinatorRequestManager coordinatorManager = setupCoordinatorManager (GROUP_ID );
260
+
261
+ expectFindCoordinatorRequest (coordinatorManager , Errors .NONE );
262
+ assertTrue (coordinatorManager .coordinator ().isPresent ());
263
+
264
+ coordinatorManager .markCoordinatorUnknown ("coordinator changed" , time .milliseconds ());
265
+ assertEquals (Collections .emptyList (), coordinatorManager .poll (time .milliseconds ()).unsentRequests );
266
+
267
+ coordinatorManager .pollOnClose (time .milliseconds ());
268
+
269
+ time .sleep (RETRY_BACKOFF_MS - 1 );
270
+ assertEquals (Collections .emptyList (), coordinatorManager .poll (time .milliseconds ()).unsentRequests );
271
+
272
+ time .sleep (RETRY_BACKOFF_MS );
273
+ assertEquals (Collections .emptyList (), coordinatorManager .poll (time .milliseconds ()).unsentRequests ,
274
+ "Should not generate find coordinator request during close" );
275
+ }
276
+
257
277
private void expectFindCoordinatorRequest (
258
278
CoordinatorRequestManager coordinatorManager ,
259
279
Errors error
You can’t perform that action at this time.
0 commit comments