From 61bd64c51221cd817227d865cd017254c4c1815d Mon Sep 17 00:00:00 2001 From: Maggie Cao Date: Tue, 2 Sep 2025 22:39:32 -0700 Subject: [PATCH 1/2] implement tableDescriptor based RocksDB conf --- .../alibaba/fluss/server/kv/KvManager.java | 11 +- .../com/alibaba/fluss/server/kv/KvTablet.java | 23 +- .../alibaba/fluss/server/replica/Replica.java | 9 +- .../fluss/server/kv/KvManagerTest.java | 58 ++++- .../alibaba/fluss/server/kv/KvTabletTest.java | 200 +++++++++++++++++- 5 files changed, 281 insertions(+), 20 deletions(-) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java index 5cc2c7253c..b89b6730a1 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java @@ -30,6 +30,7 @@ import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.TabletManagerBase; @@ -154,7 +155,8 @@ public KvTablet getOrCreateKv( KvFormat kvFormat, Schema schema, TableConfig tableConfig, - ArrowCompressionInfo arrowCompressionInfo) + ArrowCompressionInfo arrowCompressionInfo, + Map tableProperties) throws Exception { return inLock( tabletCreationOrDeletionLock, @@ -176,7 +178,8 @@ public KvTablet getOrCreateKv( kvFormat, schema, merger, - arrowCompressionInfo); + arrowCompressionInfo, + tableProperties); currentKvs.put(tableBucket, tablet); LOG.info( @@ -267,6 +270,7 @@ public KvTablet loadKv(File tabletDir) throws Exception { // TODO: we should support recover schema from disk to decouple put and schema. TablePath tablePath = physicalTablePath.getTablePath(); TableInfo tableInfo = getTableInfo(zkClient, tablePath); + TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); RowMerger rowMerger = RowMerger.create( tableInfo.getTableConfig(), @@ -284,7 +288,8 @@ public KvTablet loadKv(File tabletDir) throws Exception { tableInfo.getTableConfig().getKvFormat(), tableInfo.getSchema(), rowMerger, - tableInfo.getTableConfig().getArrowCompressionInfo()); + tableInfo.getTableConfig().getArrowCompressionInfo(), + tableDescriptor.getProperties()); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 5cdbba3a25..5c58223204 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -64,14 +64,12 @@ import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.types.Tuple2; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; - import java.io.File; import java.io.IOException; import java.util.Collection; @@ -158,7 +156,8 @@ public static KvTablet create( KvFormat kvFormat, Schema schema, RowMerger rowMerger, - ArrowCompressionInfo arrowCompressionInfo) + ArrowCompressionInfo arrowCompressionInfo, + Map tableProperties) throws IOException { Tuple2 tablePathAndBucket = FlussPaths.parseTabletDir(kvTabletDir); @@ -173,7 +172,8 @@ public static KvTablet create( kvFormat, schema, rowMerger, - arrowCompressionInfo); + arrowCompressionInfo, + tableProperties); } public static KvTablet create( @@ -187,9 +187,10 @@ public static KvTablet create( KvFormat kvFormat, Schema schema, RowMerger rowMerger, - ArrowCompressionInfo arrowCompressionInfo) + ArrowCompressionInfo arrowCompressionInfo, + Map tableProperties) throws IOException { - RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir); + RocksDBKv kv = buildRocksDBKv(tableProperties, serverConf, kvTabletDir); return new KvTablet( tablePath, tableBucket, @@ -206,10 +207,16 @@ public static KvTablet create( arrowCompressionInfo); } - private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) + private static RocksDBKv buildRocksDBKv( + Map tableProperties, Configuration configuration, File kvDir) throws IOException { + // include tableProperties in configuration and set tableProperties + // before it falls back to global options for RocksDB + Configuration tableConf = Configuration.fromMap(tableProperties); + Configuration tablePropertiesWithGlobalFallback = new Configuration(configuration); + tablePropertiesWithGlobalFallback.addAll(tableConf); RocksDBResourceContainer rocksDBResourceContainer = - new RocksDBResourceContainer(configuration, kvDir); + new RocksDBResourceContainer(tablePropertiesWithGlobalFallback, kvDir); RocksDBKvBuilder rocksDBKvBuilder = new RocksDBKvBuilder( kvDir, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 0e8a6d8952..e8397d54b0 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -35,6 +35,7 @@ import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.metrics.Counter; @@ -124,6 +125,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static com.alibaba.fluss.server.TabletManagerBase.getTableInfo; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock; import static com.alibaba.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -617,6 +619,10 @@ private Optional initKvTablet() { LOG.info("No snapshot found, restore from log."); // actually, kv manager always create a kv tablet since we will drop the kv // if it exists before init kv tablet + TablePath tablePath = physicalPath.getTablePath(); + TableInfo tableInfo = getTableInfo(snapshotContext.getZooKeeperClient(), tablePath); + TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); + kvTablet = kvManager.getOrCreateKv( physicalPath, @@ -625,7 +631,8 @@ private Optional initKvTablet() { tableConfig.getKvFormat(), schema, tableConfig, - arrowCompressionInfo); + arrowCompressionInfo, + tableDescriptor.getProperties()); } kvTablet.registerMetrics(bucketMetricGroup); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java index 2de1dc851a..d7c180a651 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java @@ -20,22 +20,28 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.TableConfig; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.KvRecord; import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.KvRecordTestUtils; import com.alibaba.fluss.record.TestData; import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.coordinator.MetadataManager; import com.alibaba.fluss.server.log.LogManager; import com.alibaba.fluss.server.log.LogTablet; import com.alibaba.fluss.server.zk.NOPErrorHandler; import com.alibaba.fluss.server.zk.ZooKeeperClient; import com.alibaba.fluss.server.zk.ZooKeeperExtension; import com.alibaba.fluss.testutils.common.AllCallbackWrapper; +import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.clock.SystemClock; import com.alibaba.fluss.utils.concurrent.FlussScheduler; @@ -53,14 +59,18 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; +import static com.alibaba.fluss.server.TabletManagerBase.getTableInfo; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link KvManager} . */ @@ -86,9 +96,28 @@ final class KvManagerTest { private TableBucket tableBucket1; private TableBucket tableBucket2; + private MetadataManager metadataManager; private LogManager logManager; private KvManager kvManager; private Configuration conf; + protected static final Schema DEFAULT_SCHEMA = + Schema.newBuilder() + .primaryKey("id") + .column("id", DataTypes.INT()) + .withComment("person id") + .column("name", DataTypes.STRING()) + .withComment("person name") + .column("age", DataTypes.INT()) + .withComment("person age") + .build(); + protected static final TableDescriptor DEFAULT_TABLE_DESCRIPTOR = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table") + .distributedBy(3, "id") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); @BeforeAll static void baseBeforeAll() { @@ -103,12 +132,27 @@ void setup() throws Exception { conf = new Configuration(); conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); - String dbName = "db1"; - tablePath1 = TablePath.of(dbName, "t1"); - tablePath2 = TablePath.of(dbName, "t2"); + String dbName1 = "db1"; + String dbName2 = "db2"; + + tablePath1 = TablePath.of(dbName1, "t1"); + tablePath2 = TablePath.of(dbName2, "t1"); // we need a log manager for kv manager + metadataManager = new MetadataManager(zkClient, conf); + + Map props = new HashMap<>(); + props.put("kv.rocksdb.thread.num", "2"); + // DEFAULT_TABLE_DESCRIPTOR + metadataManager.createDatabase(dbName1, DatabaseDescriptor.EMPTY, true); + metadataManager.createDatabase(dbName2, DatabaseDescriptor.EMPTY, true); + + metadataManager.createTable( + tablePath1, DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(1), null, true); + metadataManager.createTable( + tablePath2, DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(1), null, true); + logManager = LogManager.create(conf, zkClient, new FlussScheduler(1), SystemClock.getInstance()); kvManager = KvManager.create(conf, zkClient, logManager); @@ -260,6 +304,11 @@ private KvTablet getOrCreateKv( PhysicalTablePath physicalTablePath = PhysicalTablePath.of( tablePath.getDatabaseName(), tablePath.getTableName(), partitionName); + + TableInfo tableInfo = getTableInfo(zkClient, physicalTablePath.getTablePath()); + TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); + Map tableProperties = tableDescriptor.getProperties(); + LogTablet logTablet = logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); return kvManager.getOrCreateKv( @@ -269,7 +318,8 @@ private KvTablet getOrCreateKv( KvFormat.COMPACTED, DATA1_SCHEMA_PK, new TableConfig(new Configuration()), - DEFAULT_COMPRESSION); + DEFAULT_COMPRESSION, + tableProperties); } private byte[] valueOf(KvRecord kvRecord) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index d4e74bdc32..700dc5571b 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -17,16 +17,20 @@ package com.alibaba.fluss.server.kv; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.TableConfig; import com.alibaba.fluss.exception.InvalidTargetColumnException; import com.alibaba.fluss.exception.OutOfOrderSequenceException; import com.alibaba.fluss.memory.TestingMemorySegmentPool; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.ChangeType; import com.alibaba.fluss.record.FileLogProjection; @@ -40,15 +44,21 @@ import com.alibaba.fluss.record.bytesview.MultiBytesView; import com.alibaba.fluss.row.BinaryRow; import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.coordinator.MetadataManager; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; import com.alibaba.fluss.server.kv.rowmerger.RowMerger; import com.alibaba.fluss.server.log.FetchIsolation; import com.alibaba.fluss.server.log.LogAppendInfo; +import com.alibaba.fluss.server.log.LogManager; import com.alibaba.fluss.server.log.LogTablet; import com.alibaba.fluss.server.log.LogTestUtils; +import com.alibaba.fluss.server.zk.NOPErrorHandler; +import com.alibaba.fluss.server.zk.ZooKeeperClient; +import com.alibaba.fluss.server.zk.ZooKeeperExtension; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import com.alibaba.fluss.testutils.common.AllCallbackWrapper; import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.types.StringType; @@ -56,8 +66,10 @@ import com.alibaba.fluss.utils.concurrent.FlussScheduler; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -65,6 +77,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -83,6 +96,7 @@ import static com.alibaba.fluss.record.TestData.DATA2_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static com.alibaba.fluss.server.TabletManagerBase.getTableInfo; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; import static com.alibaba.fluss.testutils.LogRecordsAssert.assertThatLogRecords; @@ -93,8 +107,14 @@ /** Test for {@link KvTablet}. */ class KvTabletTest { + @RegisterExtension + public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private LogManager logManager; + private KvManager kvManager; private static final short schemaId = 1; - private final Configuration conf = new Configuration(); + private Configuration conf = new Configuration(); private final RowType baseRowType = TestData.DATA1_ROW_TYPE; private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = KvRecordTestUtils.KvRecordBatchFactory.of(schemaId); @@ -107,12 +127,47 @@ class KvTabletTest { private LogTablet logTablet; private KvTablet kvTablet; private ExecutorService executor; + private static ZooKeeperClient zkClient; + private TablePath tablePath; + + @BeforeAll + static void baseBeforeAll() { + zkClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } @BeforeEach void beforeEach() { executor = Executors.newFixedThreadPool(2); } + @BeforeEach + void setup() throws Exception { + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + Map props = new HashMap<>(); + + props.put("kv.rocksdb.thread.num", "2"); + metadataManager.createDatabase("testDb", DatabaseDescriptor.EMPTY, true); + + logManager = + LogManager.create(conf, zkClient, new FlussScheduler(1), SystemClock.getInstance()); + kvManager = KvManager.create(conf, zkClient, logManager); + kvManager.startup(); + } + + @AfterEach + void tearDown() throws Exception { + if (kvManager != null) { + kvManager.shutdown(); + } + if (logManager != null) { + logManager.shutdown(); + } + } + @AfterEach void afterEach() { if (executor != null) { @@ -122,7 +177,7 @@ void afterEach() { private void initLogTabletAndKvTablet(Schema schema, Map tableConfig) throws Exception { - initLogTabletAndKvTablet(TablePath.of("testDb", "t1"), schema, tableConfig); + initLogTabletAndKvTablet(tablePath, schema, tableConfig); } private void initLogTabletAndKvTablet( @@ -161,6 +216,10 @@ private KvTablet createKvTablet( Schema schema, Map tableConfig) throws Exception { + + TableInfo tableInfo = getTableInfo(zkClient, tablePath.getTablePath()); + TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); + Map tableProperties = tableDescriptor.getProperties(); RowMerger rowMerger = RowMerger.create( new TableConfig(Configuration.fromMap(tableConfig)), @@ -177,13 +236,30 @@ private KvTablet createKvTablet( KvFormat.COMPACTED, schema, rowMerger, - DEFAULT_COMPRESSION); + DEFAULT_COMPRESSION, + tableProperties); } @Test void testInvalidPartialUpdate1() throws Exception { final Schema schema1 = DATA2_SCHEMA; - initLogTabletAndKvTablet(schema1, new HashMap<>()); + tablePath = TablePath.of("testDb", "t1"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA2_SCHEMA) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + + initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>()); + KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = KvRecordTestUtils.KvRecordFactory.of(schema1.getRowType()); KvRecordBatch kvRecordBatch = @@ -200,6 +276,9 @@ void testInvalidPartialUpdate1() throws Exception { @Test void testInvalidPartialUpdate2() throws Exception { + + tablePath = TablePath.of("testDb", "t1"); + // the column not in target columns is not null final Schema schema2 = Schema.newBuilder() @@ -208,6 +287,21 @@ void testInvalidPartialUpdate2() throws Exception { .column("c", new StringType(false)) .primaryKey("a") .build(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema2) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(schema2, new HashMap<>()); KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = KvRecordTestUtils.KvRecordFactory.of(schema2.getRowType()); @@ -228,7 +322,24 @@ void testInvalidPartialUpdate2() throws Exception { @Test void testPartialUpdateAndDelete() throws Exception { + tablePath = TablePath.of("testDb", "t1"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA2_SCHEMA) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>()); + RowType rowType = DATA2_SCHEMA.getRowType(); KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = KvRecordTestUtils.KvRecordFactory.of(rowType); @@ -403,6 +514,23 @@ void testPartialUpdateAndDelete() throws Exception { @Test void testPutWithMultiThread() throws Exception { + + tablePath = TablePath.of("testDb", "t1"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); // create two kv batches KvRecordBatch kvRecordBatch1 = @@ -543,6 +671,22 @@ void testPutWithMultiThread() throws Exception { @Test void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { + tablePath = TablePath.of("testDb", "t1"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); long writeId = 100L; List kvData1 = @@ -599,11 +743,28 @@ void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testFirstRowMergeEngine(boolean doProjection) throws Exception { + Map config = new HashMap<>(); config.put("table.merge-engine", "first_row"); String tableName = "test_first_row_merge_engine_" + (doProjection ? "projection" : "no_projection"); TablePath tablePath = TablePath.of("testDb", tableName); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createDatabase("testDb", DatabaseDescriptor.EMPTY, true); + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(tablePath, DATA1_SCHEMA_PK, config); RowType rowType = DATA1_SCHEMA_PK.getRowType(); FileLogProjection logProjection = null; @@ -710,6 +871,21 @@ void testVersionRowMergeEngine(boolean doProjection) throws Exception { "test_versioned_row_merge_engine_" + (doProjection ? "projection" : "no_projection"); TablePath tablePath = TablePath.of("testDb", tableName); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA3_SCHEMA_PK) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(tablePath, DATA3_SCHEMA_PK, config); RowType rowType = DATA3_SCHEMA_PK.getRowType(); KvRecordTestUtils.KvRecordFactory kvRecordFactory = @@ -832,6 +1008,22 @@ void testVersionRowMergeEngine(boolean doProjection) throws Exception { @Test void testAppendDuplicatedKvBatch() throws Exception { + tablePath = TablePath.of("testDb", "t1"); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .comment("test table") + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) + .customProperty("connector", "fluss") + .build(); + + MetadataManager metadataManager = new MetadataManager(zkClient, conf); + + metadataManager.createTable( + tablePath, tableDescriptor.withReplicationFactor(1), null, true); + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); long writeId = 100L; List kvData1 = From 1cd98fdba3d20356d9315b81086aa033f8b0d634 Mon Sep 17 00:00:00 2001 From: Maggie Cao Date: Tue, 2 Sep 2025 23:30:07 -0700 Subject: [PATCH 2/2] fix license issues --- .../lake/paimon/utils/FlussToPaimonPredicateConverterTest.java | 2 +- .../src/main/java/com/alibaba/fluss/server/kv/KvTablet.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java index 5afec41775..a933fcb308 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025 Alibaba Group Holding Ltd. + * Copyright (c) 2025 . * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 5c58223204..f4756f8a6c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -64,12 +64,14 @@ import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.types.Tuple2; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; + import java.io.File; import java.io.IOException; import java.util.Collection;