Skip to content

Commit cff28c1

Browse files
committed
[server] Fix ReplicaFetcherThread keeps throwing OutOfOrderSequenceException because of writer id expire
1 parent 3a48f6d commit cff28c1

File tree

6 files changed

+182
-23
lines changed

6 files changed

+182
-23
lines changed

fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
643643
// now that we have valid records, offsets assigned, we need to validate the idempotent
644644
// state of the writers and collect some metadata.
645645
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
646-
analyzeAndValidateWriterState(validRecords);
646+
analyzeAndValidateWriterState(validRecords, appendAsLeader);
647647

648648
if (validateResult.isLeft()) {
649649
// have duplicated batch metadata, skip the append and update append info.
@@ -1006,7 +1006,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10061006

10071007
/** Returns either the duplicated batch metadata (left) or the updated writers (right). */
10081008
private Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>>
1009-
analyzeAndValidateWriterState(MemoryLogRecords records) {
1009+
analyzeAndValidateWriterState(MemoryLogRecords records, boolean isAppendAsLeader) {
10101010
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
10111011

10121012
for (LogRecordBatch batch : records.batches()) {
@@ -1023,14 +1023,15 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
10231023
}
10241024

10251025
// update write append info.
1026-
updateWriterAppendInfo(writerStateManager, batch, updatedWriters);
1026+
updateWriterAppendInfo(writerStateManager, batch, updatedWriters, isAppendAsLeader);
10271027
}
10281028
}
10291029

10301030
return Either.right(updatedWriters.values());
10311031
}
10321032

1033-
void removeExpiredWriter(long currentTimeMs) {
1033+
@VisibleForTesting
1034+
public void removeExpiredWriter(long currentTimeMs) {
10341035
synchronized (lock) {
10351036
writerStateManager.removeExpiredWriters(currentTimeMs);
10361037
}
@@ -1110,14 +1111,16 @@ private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionR
11101111
private static void updateWriterAppendInfo(
11111112
WriterStateManager writerStateManager,
11121113
LogRecordBatch batch,
1113-
Map<Long, WriterAppendInfo> writers) {
1114+
Map<Long, WriterAppendInfo> writers,
1115+
boolean isAppendAsLeader) {
11141116
long writerId = batch.writerId();
11151117
// update writers.
11161118
WriterAppendInfo appendInfo =
11171119
writers.computeIfAbsent(writerId, id -> writerStateManager.prepareUpdate(writerId));
11181120
appendInfo.append(
11191121
batch,
1120-
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
1122+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
1123+
isAppendAsLeader);
11211124
}
11221125

11231126
static void rebuildWriterState(
@@ -1230,7 +1233,7 @@ private static void loadWritersFromRecords(
12301233
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
12311234
for (LogRecordBatch batch : records.batches()) {
12321235
if (batch.hasWriterId()) {
1233-
updateWriterAppendInfo(writerStateManager, batch, loadedWriters);
1236+
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
12341237
}
12351238
}
12361239
loadedWriters.values().forEach(writerStateManager::update);

fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ public long writerId() {
4444
return writerId;
4545
}
4646

47-
public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
47+
public void append(
48+
LogRecordBatch batch, boolean isWriterInBatchExpired, boolean isAppendAsLeader) {
4849
LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(batch.baseLogOffset());
4950
appendDataBatch(
5051
batch.batchSequence(),
5152
firstOffsetMetadata,
5253
batch.lastLogOffset(),
5354
isWriterInBatchExpired,
55+
isAppendAsLeader,
5456
batch.commitTimestamp());
5557
}
5658

@@ -59,8 +61,9 @@ public void appendDataBatch(
5961
LogOffsetMetadata firstOffsetMetadata,
6062
long lastOffset,
6163
boolean isWriterInBatchExpired,
64+
boolean isAppendAsLeader,
6265
long batchTimestamp) {
63-
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset);
66+
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset, isAppendAsLeader);
6467
updatedEntry.addBath(
6568
batchSequence,
6669
lastOffset,
@@ -69,13 +72,16 @@ public void appendDataBatch(
6972
}
7073

7174
private void maybeValidateDataBatch(
72-
int appendFirstSeq, boolean isWriterInBatchExpired, long lastOffset) {
75+
int appendFirstSeq,
76+
boolean isWriterInBatchExpired,
77+
long lastOffset,
78+
boolean isAppendAsLeader) {
7379
int currentLastSeq =
7480
!updatedEntry.isEmpty()
7581
? updatedEntry.lastBatchSequence()
7682
: currentEntry.lastBatchSequence();
7783
// must be in sequence, even for the first batch should start from 0
78-
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired)) {
84+
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired, isAppendAsLeader)) {
7985
throw new OutOfOrderSequenceException(
8086
String.format(
8187
"Out of order batch sequence for writer %s at offset %s in "
@@ -93,16 +99,52 @@ public WriterStateEntry toEntry() {
9399
* three scenarios will be judged as in sequence:
94100
*
95101
* <ul>
96-
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check whether the committed
97-
* timestamp of the next batch under the current writerId has expired. If it has expired,
98-
* we consider this a special case caused by writerId expiration, for this case, to ensure
99-
* the correctness of follower sync, we still treat it as in sequence.
102+
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two scenarios will be judged as
103+
* in sequence:
104+
* <ul>
105+
* <li>If the committed timestamp of the next batch under the current writerId has
106+
* expired, we consider this a special case caused by writerId expiration, for this
107+
* case, to ensure the correctness of follower sync, we still treat it as in
108+
* sequence.
109+
* <li>If the append request is from the follower, we consider this is a special case
110+
* caused by inconsistent expiration of writerId between the leader and follower. To
111+
* prevent continuous fetch failures on the follower side, we still treat it as in
112+
* sequence. Here is a detailed example: The expiration of a writer is triggered
113+
* asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at intervals
114+
* defined by {@code server.writer-id.expiration-check-interval}, which can result
115+
* in slight differences in the actual expiration times of the same writer on the
116+
* leader replica and follower replicas. This slight difference leads to a dreadful
117+
* corner case. Imagine the following scenario(set {@code
118+
* server.writer-id.expiration-check-interval}: 10min, {@code
119+
* server.writer-id.expiration-time}: 12h):
120+
* <pre>{@code
121+
* Step Time Action of Leader Action of Follower
122+
* 1 00:03:38 receive batch 0 of writer 101
123+
* 2 00:03:38 fetch batch 0 of writer 101
124+
* 3 12:05:00 remove state of writer 101
125+
* 4 12:10:02 receive batch 1 of writer 101
126+
* 5 12:10:02 fetch batch 0 of writer 101
127+
* 6 12:11:00 remove state of writer 101
128+
* }</pre>
129+
* In step 3, the follower removes the state of writer 101 first, since it has been
130+
* more than 12 hours since writer 101's last batch write, making it safe to remove.
131+
* However, since the expiration of writer 101 has not yet occurred on the leader,
132+
* and a new batch 1 is received at this time, it is successfully written on the
133+
* leader. At this point, the fetcher pulls batch 1 from the leader, but since the
134+
* state of writer 101 has already been cleaned up, an {@link
135+
* OutOfOrderSequenceException} will occur during to write if we don't treat it as
136+
* in sequence.
137+
* </ul>
100138
* <li>nextBatchSeq == lastBatchSeq + 1L
101139
* <li>lastBatchSeq reaches its maximum value
102140
* </ul>
103141
*/
104-
private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean isWriterInBatchExpired) {
105-
return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
142+
private boolean inSequence(
143+
int lastBatchSeq,
144+
int nextBatchSeq,
145+
boolean isWriterInBatchExpired,
146+
boolean isAppendAsLeader) {
147+
return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired || !isAppendAsLeader))
106148
|| nextBatchSeq == lastBatchSeq + 1L
107149
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
108150
}

fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,14 @@ void testValidationOnFirstEntryWhenLoadingLog() {
126126
void testPrepareUpdateDoesNotMutate() {
127127
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
128128
appendInfo.appendDataBatch(
129-
0, new LogOffsetMetadata(15L), 20L, false, System.currentTimeMillis());
129+
0, new LogOffsetMetadata(15L), 20L, false, false, System.currentTimeMillis());
130130
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
131131
stateManager.update(appendInfo);
132132
assertThat(stateManager.lastEntry(writerId)).isPresent();
133133

134134
WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
135135
nextAppendInfo.appendDataBatch(
136-
1, new LogOffsetMetadata(26L), 30L, false, System.currentTimeMillis());
136+
1, new LogOffsetMetadata(26L), 30L, false, false, System.currentTimeMillis());
137137
assertThat(stateManager.lastEntry(writerId)).isPresent();
138138

139139
WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -521,6 +521,7 @@ private void append(
521521
new LogOffsetMetadata(offset),
522522
offset,
523523
isWriterInBatchExpired,
524+
true,
524525
lastTimestamp);
525526
stateManager.update(appendInfo);
526527
stateManager.updateMapEndOffset(offset + 1);

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.server.replica;
1919

2020
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.exception.OutOfOrderSequenceException;
2122
import org.apache.fluss.metadata.LogFormat;
2223
import org.apache.fluss.metadata.PhysicalTablePath;
2324
import org.apache.fluss.metadata.TableBucket;
@@ -48,6 +49,7 @@
4849
import java.io.File;
4950
import java.io.IOException;
5051
import java.nio.file.Path;
52+
import java.time.Duration;
5153
import java.util.ArrayList;
5254
import java.util.Arrays;
5355
import java.util.Collections;
@@ -75,10 +77,12 @@
7577
import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
7678
import static org.apache.fluss.testutils.DataTestUtils.genKvRecords;
7779
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
80+
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
7881
import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
7982
import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
8083
import static org.apache.fluss.utils.Preconditions.checkNotNull;
8184
import static org.assertj.core.api.Assertions.assertThat;
85+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
8286

8387
/** Test for {@link Replica}. */
8488
final class ReplicaTest extends ReplicaTestBase {
@@ -129,6 +133,40 @@ void testAppendRecordsToLeader() throws Exception {
129133
assertLogRecordsEquals(DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1);
130134
}
131135

136+
@Test
137+
void testAppendRecordsWithOutOfOrderBatchSequence() throws Exception {
138+
Replica logReplica =
139+
makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new TableBucket(DATA1_TABLE_ID, 1));
140+
makeLogReplicaAsLeader(logReplica);
141+
142+
long writerId = 101L;
143+
144+
// 1. append a batch with batchSequence = 0
145+
logReplica.appendRecordsToLeader(genMemoryLogRecordsWithWriterId(DATA1, writerId, 0, 0), 0);
146+
147+
// manual advance time and remove expired writer, the state of writer 101 will be removed
148+
manualClock.advanceTime(Duration.ofHours(12));
149+
manualClock.advanceTime(Duration.ofSeconds(1));
150+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
151+
.isEqualTo(1);
152+
logReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
153+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
154+
.isEqualTo(0);
155+
156+
// 2. try to append an out of ordered batch as leader, will throw
157+
// OutOfOrderSequenceException
158+
assertThatThrownBy(
159+
() ->
160+
logReplica.appendRecordsToLeader(
161+
genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10), 0))
162+
.isInstanceOf(OutOfOrderSequenceException.class);
163+
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(10);
164+
165+
// 3. try to append an out of ordered batch as follower
166+
logReplica.appendRecordsToFollower(genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10));
167+
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(20);
168+
}
169+
132170
@Test
133171
void testPartialPutRecordsToLeader() throws Exception {
134172
Replica kvReplica =

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ public void setup() throws Exception {
174174
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("512b"));
175175
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
176176

177+
conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, Duration.ofHours(12));
178+
177179
scheduler = new FlussScheduler(2);
178180
scheduler.startup();
179181

0 commit comments

Comments
 (0)