Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
Expand Down Expand Up @@ -154,7 +155,8 @@ public KvTablet getOrCreateKv(
KvFormat kvFormat,
Schema schema,
TableConfig tableConfig,
ArrowCompressionInfo arrowCompressionInfo)
ArrowCompressionInfo arrowCompressionInfo,
Map<String, String> tableProperties)
throws Exception {
return inLock(
tabletCreationOrDeletionLock,
Expand All @@ -176,7 +178,8 @@ public KvTablet getOrCreateKv(
kvFormat,
schema,
merger,
arrowCompressionInfo);
arrowCompressionInfo,
tableProperties);
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -267,6 +270,7 @@ public KvTablet loadKv(File tabletDir) throws Exception {
// TODO: we should support recover schema from disk to decouple put and schema.
TablePath tablePath = physicalTablePath.getTablePath();
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
RowMerger rowMerger =
RowMerger.create(
tableInfo.getTableConfig(),
Expand All @@ -284,7 +288,8 @@ public KvTablet loadKv(File tabletDir) throws Exception {
tableInfo.getTableConfig().getKvFormat(),
tableInfo.getSchema(),
rowMerger,
tableInfo.getTableConfig().getArrowCompressionInfo());
tableInfo.getTableConfig().getArrowCompressionInfo(),
tableDescriptor.getProperties());
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public static KvTablet create(
KvFormat kvFormat,
Schema schema,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo)
ArrowCompressionInfo arrowCompressionInfo,
Map<String, String> tableProperties)
throws IOException {
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
FlussPaths.parseTabletDir(kvTabletDir);
Expand All @@ -173,7 +174,8 @@ public static KvTablet create(
kvFormat,
schema,
rowMerger,
arrowCompressionInfo);
arrowCompressionInfo,
tableProperties);
}

public static KvTablet create(
Expand All @@ -187,9 +189,10 @@ public static KvTablet create(
KvFormat kvFormat,
Schema schema,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo)
ArrowCompressionInfo arrowCompressionInfo,
Map<String, String> tableProperties)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
RocksDBKv kv = buildRocksDBKv(tableProperties, serverConf, kvTabletDir);
return new KvTablet(
tablePath,
tableBucket,
Expand All @@ -206,10 +209,16 @@ public static KvTablet create(
arrowCompressionInfo);
}

private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
private static RocksDBKv buildRocksDBKv(
Map<String, String> tableProperties, Configuration configuration, File kvDir)
throws IOException {
// include tableProperties in configuration and set tableProperties
// before it falls back to global options for RocksDB
Configuration tableConf = Configuration.fromMap(tableProperties);
Configuration tablePropertiesWithGlobalFallback = new Configuration(configuration);
tablePropertiesWithGlobalFallback.addAll(tableConf);
RocksDBResourceContainer rocksDBResourceContainer =
new RocksDBResourceContainer(configuration, kvDir);
new RocksDBResourceContainer(tablePropertiesWithGlobalFallback, kvDir);
RocksDBKvBuilder rocksDBKvBuilder =
new RocksDBKvBuilder(
kvDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.Counter;
Expand Down Expand Up @@ -124,6 +125,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
Expand Down Expand Up @@ -617,6 +619,10 @@ private Optional<CompletedSnapshot> initKvTablet() {
LOG.info("No snapshot found, restore from log.");
// actually, kv manager always create a kv tablet since we will drop the kv
// if it exists before init kv tablet
TablePath tablePath = physicalPath.getTablePath();
TableInfo tableInfo = getTableInfo(snapshotContext.getZooKeeperClient(), tablePath);
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();

kvTablet =
kvManager.getOrCreateKv(
physicalPath,
Expand All @@ -625,7 +631,8 @@ private Optional<CompletedSnapshot> initKvTablet() {
tableConfig.getKvFormat(),
schema,
tableConfig,
arrowCompressionInfo);
arrowCompressionInfo,
tableDescriptor.getProperties());
}

kvTablet.registerMetrics(bucketMetricGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,28 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.KvRecordTestUtils;
import org.apache.fluss.record.TestData;
import org.apache.fluss.row.encode.ValueEncoder;
import org.apache.fluss.server.coordinator.MetadataManager;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.FlussScheduler;
Expand All @@ -53,14 +59,18 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link KvManager} . */
Expand All @@ -86,9 +96,28 @@ final class KvManagerTest {
private TableBucket tableBucket1;
private TableBucket tableBucket2;

private MetadataManager metadataManager;
private LogManager logManager;
private KvManager kvManager;
private Configuration conf;
protected static final Schema DEFAULT_SCHEMA =
Schema.newBuilder()
.primaryKey("id")
.column("id", DataTypes.INT())
.withComment("person id")
.column("name", DataTypes.STRING())
.withComment("person name")
.column("age", DataTypes.INT())
.withComment("person age")
.build();
protected static final TableDescriptor DEFAULT_TABLE_DESCRIPTOR =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
.comment("test table")
.distributedBy(3, "id")
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
.customProperty("connector", "fluss")
.build();

@BeforeAll
static void baseBeforeAll() {
Expand All @@ -103,12 +132,27 @@ void setup() throws Exception {
conf = new Configuration();
conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath());

String dbName = "db1";
tablePath1 = TablePath.of(dbName, "t1");
tablePath2 = TablePath.of(dbName, "t2");
String dbName1 = "db1";
String dbName2 = "db2";

tablePath1 = TablePath.of(dbName1, "t1");
tablePath2 = TablePath.of(dbName2, "t1");

// we need a log manager for kv manager

metadataManager = new MetadataManager(zkClient, conf);

Map<String, String> props = new HashMap<>();
props.put("kv.rocksdb.thread.num", "2");
// DEFAULT_TABLE_DESCRIPTOR
metadataManager.createDatabase(dbName1, DatabaseDescriptor.EMPTY, true);
metadataManager.createDatabase(dbName2, DatabaseDescriptor.EMPTY, true);

metadataManager.createTable(
tablePath1, DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(1), null, true);
metadataManager.createTable(
tablePath2, DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(1), null, true);

logManager =
LogManager.create(conf, zkClient, new FlussScheduler(1), SystemClock.getInstance());
kvManager = KvManager.create(conf, zkClient, logManager);
Expand Down Expand Up @@ -260,6 +304,11 @@ private KvTablet getOrCreateKv(
PhysicalTablePath physicalTablePath =
PhysicalTablePath.of(
tablePath.getDatabaseName(), tablePath.getTableName(), partitionName);

TableInfo tableInfo = getTableInfo(zkClient, physicalTablePath.getTablePath());
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
Map<String, String> tableProperties = tableDescriptor.getProperties();

LogTablet logTablet =
logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true);
return kvManager.getOrCreateKv(
Expand All @@ -269,7 +318,8 @@ private KvTablet getOrCreateKv(
KvFormat.COMPACTED,
DATA1_SCHEMA_PK,
new TableConfig(new Configuration()),
DEFAULT_COMPRESSION);
DEFAULT_COMPRESSION,
tableProperties);
}

private byte[] valueOf(KvRecord kvRecord) {
Expand Down
Loading
Loading