File tree Expand file tree Collapse file tree 1 file changed +9
-0
lines changed
core/src/main/java/kafka/automq/zerozone Expand file tree Collapse file tree 1 file changed +9
-0
lines changed Original file line number Diff line number Diff line change 3333import org .apache .kafka .common .message .AutomqZoneRouterRequestData ;
3434import org .apache .kafka .common .message .MetadataResponseData ;
3535import org .apache .kafka .common .message .ProduceRequestData ;
36+ import org .apache .kafka .common .protocol .Errors ;
3637import org .apache .kafka .common .record .MemoryRecords ;
38+ import org .apache .kafka .common .requests .ProduceResponse ;
3739import org .apache .kafka .common .requests .s3 .AutomqZoneRouterResponse ;
3840import org .apache .kafka .common .utils .LogContext ;
3941import org .apache .kafka .common .utils .Time ;
@@ -154,6 +156,13 @@ public void close() {
154156
155157 @ Override
156158 public void handleProduceRequest (ProduceRequestArgs args ) {
159+ if (closed .get ()) {
160+ Map <TopicPartition , ProduceResponse .PartitionResponse > responseMap = new HashMap <>(args .entriesPerPartition ().size ());
161+ args .entriesPerPartition ().forEach ((tp , records ) ->
162+ responseMap .put (tp , new ProduceResponse .PartitionResponse (Errors .NOT_LEADER_OR_FOLLOWER )));
163+ args .responseCallback ().accept (responseMap );
164+ return ;
165+ }
157166 ClientIdMetadata clientId = args .clientId ();
158167 fillRackIfMissing (clientId );
159168 if (version .isZeroZoneV2Supported ()) {
You can’t perform that action at this time.
0 commit comments