Skip to content

Commit 669053e

Browse files
authored
Merge branch 'AutoMQ:main' into main
2 parents d864004 + c3dde6e commit 669053e

File tree

5 files changed

+22
-3
lines changed

5 files changed

+22
-3
lines changed

core/src/main/java/kafka/automq/partition/snapshot/PartitionSnapshotsManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public void onPartitionClose(Partition partition) {
101101

102102
public CompletableFuture<AutomqGetPartitionSnapshotResponse> handle(AutomqGetPartitionSnapshotRequest request) {
103103
Session session;
104+
boolean newSession = false;
104105
synchronized (this) {
105106
AutomqGetPartitionSnapshotRequestData requestData = request.data();
106107
int sessionId = requestData.sessionId();
@@ -115,10 +116,11 @@ public CompletableFuture<AutomqGetPartitionSnapshotResponse> handle(AutomqGetPar
115116
sessionId = nextSessionId();
116117
session = new Session(sessionId);
117118
sessions.put(sessionId, session);
119+
newSession = true;
118120
}
119121
}
120122
CompletableFuture<AutomqGetPartitionSnapshotResponse> resp = session.snapshotsDelta(request.data().version());
121-
CompletableFuture<Void> commitCf = request.data().requestCommit() ? confirmWAL.commit(0, false) : CompletableFuture.completedFuture(null);
123+
CompletableFuture<Void> commitCf = (request.data().requestCommit() || newSession) ? confirmWAL.commit(0, false) : CompletableFuture.completedFuture(null);
122124
return commitCf.exceptionally(nil -> null).thenCompose(nil -> resp);
123125
}
124126

core/src/main/java/kafka/automq/zerozone/SnapshotReadPartitionsManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ void reset(String reason) {
360360
snapshotWithOperations.clear();
361361
waitingDataLoadedQueue.clear();
362362
requester.reset();
363+
replayer.reset();
363364
}
364365

365366
void onNewWalEndOffset(String walConfig, RecordOffset endOffset) {

core/src/main/java/kafka/automq/zerozone/SubscriberReplayer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ public void close() {
111111
}
112112
}
113113

114+
public void reset() {
115+
this.loadedObjectOrderId = -1L;
116+
this.loadedEndOffset = null;
117+
}
118+
114119
private List<S3ObjectMetadata> nextObjects() {
115120
return nextObjects0(metadataCache, node.id(), loadedObjectOrderId, value -> loadedObjectOrderId = value);
116121
}

core/src/main/java/kafka/automq/zerozone/SubscriberRequester.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ private void handleResponse(ClientResponse clientResponse, CompletableFuture<Voi
170170
if (sessionId != 0 && resp.sessionId() != sessionId) {
171171
// switch to a new session
172172
subscriber.reset(String.format("switch sessionId from %s to %s", sessionId, resp.sessionId()));
173+
// reset immediately to the new session.
174+
tryReset0();
175+
sessionId = resp.sessionId();
176+
sessionEpoch = resp.sessionEpoch();
173177
}
174178
SnapshotReadPartitionsManager.OperationBatch batch = new SnapshotReadPartitionsManager.OperationBatch();
175179
resp.topics().forEach(topic -> topic.partitions().forEach(partition -> {
@@ -190,8 +194,6 @@ private void handleResponse(ClientResponse clientResponse, CompletableFuture<Voi
190194
subscriber.onNewWalEndOffset(resp.confirmWalConfig(), DefaultRecordOffset.of(Unpooled.wrappedBuffer(resp.confirmWalEndOffset())));
191195
batch.operations.add(SnapshotWithOperation.snapshotMark(snapshotCf));
192196
subscriber.onNewOperationBatch(batch);
193-
sessionId = resp.sessionId();
194-
sessionEpoch = resp.sessionEpoch();
195197
}
196198

197199
private boolean tryReset0() {

core/src/main/java/kafka/automq/zerozone/ZeroZoneTrafficInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.apache.kafka.common.message.AutomqZoneRouterRequestData;
3434
import org.apache.kafka.common.message.MetadataResponseData;
3535
import org.apache.kafka.common.message.ProduceRequestData;
36+
import org.apache.kafka.common.protocol.Errors;
3637
import org.apache.kafka.common.record.MemoryRecords;
38+
import org.apache.kafka.common.requests.ProduceResponse;
3739
import org.apache.kafka.common.requests.s3.AutomqZoneRouterResponse;
3840
import org.apache.kafka.common.utils.LogContext;
3941
import org.apache.kafka.common.utils.Time;
@@ -154,6 +156,13 @@ public void close() {
154156

155157
@Override
156158
public void handleProduceRequest(ProduceRequestArgs args) {
159+
if (closed.get()) {
160+
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(args.entriesPerPartition().size());
161+
args.entriesPerPartition().forEach((tp, records) ->
162+
responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)));
163+
args.responseCallback().accept(responseMap);
164+
return;
165+
}
157166
ClientIdMetadata clientId = args.clientId();
158167
fillRackIfMissing(clientId);
159168
if (version.isZeroZoneV2Supported()) {

0 commit comments

Comments
 (0)