Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
// now that we have valid records, offsets assigned, we need to validate the idempotent
// state of the writers and collect some metadata.
Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>> validateResult =
analyzeAndValidateWriterState(validRecords);
analyzeAndValidateWriterState(validRecords, appendAsLeader);

if (validateResult.isLeft()) {
// have duplicated batch metadata, skip the append and update append info.
Expand Down Expand Up @@ -1006,7 +1006,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {

/** Returns either the duplicated batch metadata (left) or the updated writers (right). */
private Either<WriterStateEntry.BatchMetadata, Collection<WriterAppendInfo>>
analyzeAndValidateWriterState(MemoryLogRecords records) {
analyzeAndValidateWriterState(MemoryLogRecords records, boolean isAppendAsLeader) {
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();

for (LogRecordBatch batch : records.batches()) {
Expand All @@ -1023,14 +1023,15 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryLogRecords records) {
}

// update write append info.
updateWriterAppendInfo(writerStateManager, batch, updatedWriters);
updateWriterAppendInfo(writerStateManager, batch, updatedWriters, isAppendAsLeader);
}
}

return Either.right(updatedWriters.values());
}

void removeExpiredWriter(long currentTimeMs) {
@VisibleForTesting
public void removeExpiredWriter(long currentTimeMs) {
synchronized (lock) {
writerStateManager.removeExpiredWriters(currentTimeMs);
}
Expand Down Expand Up @@ -1110,14 +1111,16 @@ private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionR
private static void updateWriterAppendInfo(
WriterStateManager writerStateManager,
LogRecordBatch batch,
Map<Long, WriterAppendInfo> writers) {
Map<Long, WriterAppendInfo> writers,
boolean isAppendAsLeader) {
long writerId = batch.writerId();
// update writers.
WriterAppendInfo appendInfo =
writers.computeIfAbsent(writerId, id -> writerStateManager.prepareUpdate(writerId));
appendInfo.append(
batch,
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
isAppendAsLeader);
}

static void rebuildWriterState(
Expand Down Expand Up @@ -1230,7 +1233,7 @@ private static void loadWritersFromRecords(
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
for (LogRecordBatch batch : records.batches()) {
if (batch.hasWriterId()) {
updateWriterAppendInfo(writerStateManager, batch, loadedWriters);
updateWriterAppendInfo(writerStateManager, batch, loadedWriters, true);
}
}
loadedWriters.values().forEach(writerStateManager::update);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ public long writerId() {
return writerId;
}

public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
public void append(
LogRecordBatch batch, boolean isWriterInBatchExpired, boolean isAppendAsLeader) {
LogOffsetMetadata firstOffsetMetadata = new LogOffsetMetadata(batch.baseLogOffset());
appendDataBatch(
batch.batchSequence(),
firstOffsetMetadata,
batch.lastLogOffset(),
isWriterInBatchExpired,
isAppendAsLeader,
batch.commitTimestamp());
}

Expand All @@ -59,8 +61,9 @@ public void appendDataBatch(
LogOffsetMetadata firstOffsetMetadata,
long lastOffset,
boolean isWriterInBatchExpired,
boolean isAppendAsLeader,
long batchTimestamp) {
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset);
maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, lastOffset, isAppendAsLeader);
updatedEntry.addBath(
batchSequence,
lastOffset,
Expand All @@ -69,13 +72,16 @@ public void appendDataBatch(
}

private void maybeValidateDataBatch(
int appendFirstSeq, boolean isWriterInBatchExpired, long lastOffset) {
int appendFirstSeq,
boolean isWriterInBatchExpired,
long lastOffset,
boolean isAppendAsLeader) {
int currentLastSeq =
!updatedEntry.isEmpty()
? updatedEntry.lastBatchSequence()
: currentEntry.lastBatchSequence();
// must be in sequence, even for the first batch should start from 0
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired)) {
if (!inSequence(currentLastSeq, appendFirstSeq, isWriterInBatchExpired, isAppendAsLeader)) {
throw new OutOfOrderSequenceException(
String.format(
"Out of order batch sequence for writer %s at offset %s in "
Expand All @@ -93,16 +99,52 @@ public WriterStateEntry toEntry() {
* three scenarios will be judged as in sequence:
*
* <ul>
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check whether the committed
* timestamp of the next batch under the current writerId has expired. If it has expired,
* we consider this a special case caused by writerId expiration, for this case, to ensure
* the correctness of follower sync, we still treat it as in sequence.
* <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two scenarios will be judged as
* in sequence:
* <ul>
* <li>If the committed timestamp of the next batch under the current writerId has
* expired, we consider this a special case caused by writerId expiration, for this
* case, to ensure the correctness of follower sync, we still treat it as in
* sequence.
* <li>If the append request is from the follower, we consider this is a special case
* caused by inconsistent expiration of writerId between the leader and follower. To
* prevent continuous fetch failures on the follower side, we still treat it as in
* sequence. Here is a detailed example: The expiration of a writer is triggered
* asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at intervals
* defined by {@code server.writer-id.expiration-check-interval}, which can result
* in slight differences in the actual expiration times of the same writer on the
* leader replica and follower replicas. This slight difference leads to a dreadful
* corner case. Imagine the following scenario(set {@code
* server.writer-id.expiration-check-interval}: 10min, {@code
* server.writer-id.expiration-time}: 12h):
* <pre>{@code
* Step Time Action of Leader Action of Follower
* 1 00:03:38 receive batch 0 of writer 101
* 2 00:03:38 fetch batch 0 of writer 101
* 3 12:05:00 remove state of writer 101
* 4 12:10:02 receive batch 1 of writer 101
* 5 12:10:02 fetch batch 0 of writer 101
* 6 12:11:00 remove state of writer 101
* }</pre>
* In step 3, the follower removes the state of writer 101 first, since it has been
* more than 12 hours since writer 101's last batch write, making it safe to remove.
* However, since the expiration of writer 101 has not yet occurred on the leader,
* and a new batch 1 is received at this time, it is successfully written on the
* leader. At this point, the fetcher pulls batch 1 from the leader, but since the
* state of writer 101 has already been cleaned up, an {@link
* OutOfOrderSequenceException} will occur during to write if we don't treat it as
* in sequence.
* </ul>
* <li>nextBatchSeq == lastBatchSeq + 1L
* <li>lastBatchSeq reaches its maximum value
* </ul>
*/
private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean isWriterInBatchExpired) {
return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
private boolean inSequence(
int lastBatchSeq,
int nextBatchSeq,
boolean isWriterInBatchExpired,
boolean isAppendAsLeader) {
return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired || !isAppendAsLeader))
|| nextBatchSeq == lastBatchSeq + 1L
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ void testValidationOnFirstEntryWhenLoadingLog() {
void testPrepareUpdateDoesNotMutate() {
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
appendInfo.appendDataBatch(
0, new LogOffsetMetadata(15L), 20L, false, System.currentTimeMillis());
0, new LogOffsetMetadata(15L), 20L, false, false, System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
stateManager.update(appendInfo);
assertThat(stateManager.lastEntry(writerId)).isPresent();

WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
nextAppendInfo.appendDataBatch(
1, new LogOffsetMetadata(26L), 30L, false, System.currentTimeMillis());
1, new LogOffsetMetadata(26L), 30L, false, false, System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isPresent();

WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
Expand Down Expand Up @@ -521,6 +521,7 @@ private void append(
new LogOffsetMetadata(offset),
offset,
isWriterInBatchExpired,
true,
lastTimestamp);
stateManager.update(appendInfo);
stateManager.updateMapEndOffset(offset + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.server.replica;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -75,10 +77,12 @@
import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
import static org.apache.fluss.testutils.DataTestUtils.genKvRecords;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link Replica}. */
final class ReplicaTest extends ReplicaTestBase {
Expand Down Expand Up @@ -129,6 +133,40 @@ void testAppendRecordsToLeader() throws Exception {
assertLogRecordsEquals(DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1);
}

@Test
void testAppendRecordsWithOutOfOrderBatchSequence() throws Exception {
Replica logReplica =
makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new TableBucket(DATA1_TABLE_ID, 1));
makeLogReplicaAsLeader(logReplica);

long writerId = 101L;

// 1. append a batch with batchSequence = 0
logReplica.appendRecordsToLeader(genMemoryLogRecordsWithWriterId(DATA1, writerId, 0, 0), 0);

// manual advance time and remove expired writer, the state of writer 101 will be removed
manualClock.advanceTime(Duration.ofHours(12));
manualClock.advanceTime(Duration.ofSeconds(1));
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
.isEqualTo(1);
logReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
.isEqualTo(0);

// 2. try to append an out of ordered batch as leader, will throw
// OutOfOrderSequenceException
assertThatThrownBy(
() ->
logReplica.appendRecordsToLeader(
genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10), 0))
.isInstanceOf(OutOfOrderSequenceException.class);
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(10);

// 3. try to append an out of ordered batch as follower
logReplica.appendRecordsToFollower(genMemoryLogRecordsWithWriterId(DATA1, writerId, 2, 10));
assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(20);
}

@Test
void testPartialPutRecordsToLeader() throws Exception {
Replica kvReplica =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ public void setup() throws Exception {
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("512b"));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));

conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, Duration.ofHours(12));

scheduler = new FlussScheduler(2);
scheduler.startup();

Expand Down
Loading
Loading