Skip to content

Commit b28807d

Browse files
committed
[FIP-12] Add dynamic config and enable lakehouse dynamically.
1 parent 1f9de70 commit b28807d

File tree

35 files changed

+1763
-82
lines changed

35 files changed

+1763
-82
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.config.ConfigOptions;
26+
import org.apache.fluss.config.dynamic.AlterConfigOp;
27+
import org.apache.fluss.config.dynamic.ConfigEntry;
2628
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2729
import org.apache.fluss.exception.DatabaseNotEmptyException;
2830
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -452,4 +454,19 @@ ListOffsetsResult listOffsets(
452454
* @return A CompletableFuture indicating completion of the operation.
453455
*/
454456
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
457+
458+
/**
459+
* Describe the configs of the cluster.
460+
*
461+
* @return A CompletableFuture containing the configs of the cluster.
462+
*/
463+
CompletableFuture<Collection<ConfigEntry>> describeConfigs();
464+
465+
/**
466+
* Alter the configs of the cluster.
467+
*
468+
* @param configs List of configs to alter.
469+
* @return A CompletableFuture indicating completion of the operation.
470+
*/
471+
CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
455472
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.config.dynamic.AlterConfigOp;
28+
import org.apache.fluss.config.dynamic.ConfigEntry;
2729
import org.apache.fluss.exception.LeaderNotAvailableException;
2830
import org.apache.fluss.metadata.DatabaseDescriptor;
2931
import org.apache.fluss.metadata.DatabaseInfo;
@@ -41,11 +43,13 @@
4143
import org.apache.fluss.rpc.gateway.AdminGateway;
4244
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4345
import org.apache.fluss.rpc.gateway.TabletServerGateway;
46+
import org.apache.fluss.rpc.messages.AlterConfigsRequest;
4447
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4548
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4649
import org.apache.fluss.rpc.messages.CreateTableRequest;
4750
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
4851
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
52+
import org.apache.fluss.rpc.messages.DescribeConfigsRequest;
4953
import org.apache.fluss.rpc.messages.DropAclsRequest;
5054
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
5155
import org.apache.fluss.rpc.messages.DropTableRequest;
@@ -62,6 +66,8 @@
6266
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
6367
import org.apache.fluss.rpc.messages.ListTablesRequest;
6468
import org.apache.fluss.rpc.messages.ListTablesResponse;
69+
import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
70+
import org.apache.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
6571
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
6672
import org.apache.fluss.rpc.messages.PbPartitionSpec;
6773
import org.apache.fluss.rpc.messages.PbTablePath;
@@ -81,6 +87,7 @@
8187
import java.util.List;
8288
import java.util.Map;
8389
import java.util.concurrent.CompletableFuture;
90+
import java.util.stream.Collectors;
8491

8592
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
8693
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -464,6 +471,65 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
464471
return result;
465472
}
466473

474+
@Override
475+
public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
476+
CompletableFuture<Collection<ConfigEntry>> future = new CompletableFuture<>();
477+
DescribeConfigsRequest request = new DescribeConfigsRequest();
478+
gateway.describeConfigs(request)
479+
.whenComplete(
480+
(r, t) -> {
481+
if (t != null) {
482+
future.completeExceptionally(t);
483+
}
484+
485+
List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList();
486+
List<ConfigEntry> configEntries =
487+
responseInfos.stream()
488+
.map(
489+
responseInfo ->
490+
new ConfigEntry(
491+
responseInfo.getConfigKey(),
492+
responseInfo.hasConfigValue()
493+
? responseInfo
494+
.getConfigValue()
495+
: null,
496+
ConfigEntry.ConfigSource
497+
.valueOf(
498+
responseInfo
499+
.getConfigSource())))
500+
.collect(Collectors.toList());
501+
future.complete(configEntries);
502+
});
503+
return future;
504+
}
505+
506+
@Override
507+
public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs) {
508+
CompletableFuture<Void> future = new CompletableFuture<>();
509+
510+
AlterConfigsRequest request = new AlterConfigsRequest();
511+
for (AlterConfigOp alterConfigOp : configs) {
512+
PbAlterConfigsRequestInfo requestInfo =
513+
request.addInfo()
514+
.setConfigKey(alterConfigOp.key())
515+
.setOpType(alterConfigOp.opType().id());
516+
if (alterConfigOp.value() != null) {
517+
requestInfo.setConfigValue(alterConfigOp.value());
518+
}
519+
}
520+
gateway.alterConfigs(request)
521+
.whenComplete(
522+
(r, t) -> {
523+
if (t != null) {
524+
future.completeExceptionally(t);
525+
}
526+
527+
future.complete(null);
528+
});
529+
530+
return future;
531+
}
532+
467533
@Override
468534
public void close() {
469535
// nothing to do yet

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.fluss.config.AutoPartitionTimeUnit;
2828
import org.apache.fluss.config.ConfigOptions;
2929
import org.apache.fluss.config.Configuration;
30+
import org.apache.fluss.config.dynamic.AlterConfigOp;
31+
import org.apache.fluss.config.dynamic.ConfigEntry;
3032
import org.apache.fluss.exception.DatabaseAlreadyExistException;
3133
import org.apache.fluss.exception.DatabaseNotEmptyException;
3234
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -45,7 +47,6 @@
4547
import org.apache.fluss.exception.TooManyPartitionsException;
4648
import org.apache.fluss.fs.FsPath;
4749
import org.apache.fluss.fs.FsPathAndFileName;
48-
import org.apache.fluss.metadata.DataLakeFormat;
4950
import org.apache.fluss.metadata.DatabaseDescriptor;
5051
import org.apache.fluss.metadata.DatabaseInfo;
5152
import org.apache.fluss.metadata.KvFormat;
@@ -78,6 +79,8 @@
7879
import java.util.stream.Collectors;
7980
import java.util.stream.Stream;
8081

82+
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
83+
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
8184
import static org.apache.fluss.testutils.DataTestUtils.row;
8285
import static org.assertj.core.api.Assertions.assertThat;
8386
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -165,7 +168,7 @@ void testGetTableInfoAndSchema() throws Exception {
165168
.isEqualTo(
166169
DEFAULT_TABLE_DESCRIPTOR
167170
.withReplicationFactor(3)
168-
.withDataLakeFormat(DataLakeFormat.PAIMON));
171+
.withDataLakeFormat(PAIMON));
169172
assertThat(schemaInfo2).isEqualTo(schemaInfo);
170173
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
171174
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
@@ -190,7 +193,7 @@ void testGetTableInfoAndSchema() throws Exception {
190193
.isEqualTo(
191194
DEFAULT_TABLE_DESCRIPTOR
192195
.withReplicationFactor(3)
193-
.withDataLakeFormat(DataLakeFormat.PAIMON));
196+
.withDataLakeFormat(PAIMON));
194197
assertThat(schemaInfo2).isEqualTo(schemaInfo);
195198
// assert created time
196199
assertThat(tableInfo.getCreatedTime())
@@ -390,7 +393,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
390393
.isEqualTo(
391394
DEFAULT_TABLE_DESCRIPTOR
392395
.withReplicationFactor(3)
393-
.withDataLakeFormat(DataLakeFormat.PAIMON));
396+
.withDataLakeFormat(PAIMON));
394397
}
395398
}
396399

@@ -886,6 +889,53 @@ tablePath, newPartitionSpec("age", "11"), false)
886889
.isInstanceOf(TooManyPartitionsException.class);
887890
}
888891

892+
@Test
893+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
894+
assertThat(
895+
FLUSS_CLUSTER_EXTENSION
896+
.getCoordinatorServer()
897+
.getCoordinatorService()
898+
.getDataLakeFormat())
899+
.isEqualTo(PAIMON);
900+
901+
admin.alterConfigs(
902+
Collections.singletonList(
903+
new AlterConfigOp(
904+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET)))
905+
.get();
906+
assertThat(
907+
FLUSS_CLUSTER_EXTENSION
908+
.getCoordinatorServer()
909+
.getCoordinatorService()
910+
.getDataLakeFormat())
911+
.isNull();
912+
assertThat(admin.describeConfigs().get())
913+
.contains(
914+
new ConfigEntry(
915+
DATALAKE_FORMAT.key(),
916+
null,
917+
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
918+
919+
// Delete dynamic configs to use the initial value(from server.yaml)
920+
admin.alterConfigs(
921+
Collections.singletonList(
922+
new AlterConfigOp(
923+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE)))
924+
.get();
925+
assertThat(
926+
FLUSS_CLUSTER_EXTENSION
927+
.getCoordinatorServer()
928+
.getCoordinatorService()
929+
.getDataLakeFormat())
930+
.isEqualTo(PAIMON);
931+
assertThat(admin.describeConfigs().get())
932+
.contains(
933+
new ConfigEntry(
934+
DATALAKE_FORMAT.key(),
935+
"paimon",
936+
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
937+
}
938+
889939
private void assertNoBucketSnapshot(KvSnapshots snapshots, int expectBucketNum) {
890940
assertThat(snapshots.getBucketIds()).hasSize(expectBucketNum);
891941
for (int i = 0; i < expectBucketNum; i++) {

fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.fluss.config.ConfigOptions;
2929
import org.apache.fluss.config.Configuration;
3030
import org.apache.fluss.config.MemorySize;
31+
import org.apache.fluss.config.dynamic.AlterConfigOp;
32+
import org.apache.fluss.config.dynamic.ConfigEntry;
3133
import org.apache.fluss.exception.AuthorizationException;
3234
import org.apache.fluss.metadata.DataLakeFormat;
3335
import org.apache.fluss.metadata.DatabaseDescriptor;
@@ -69,6 +71,7 @@
6971
import java.util.List;
7072
import java.util.concurrent.ExecutionException;
7173

74+
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
7275
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
7376
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
7477
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
@@ -653,6 +656,70 @@ void testProduceAndConsumer() throws Exception {
653656
}
654657
}
655658

659+
@Test
660+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
661+
assertThatThrownBy(
662+
() ->
663+
guestAdmin
664+
.alterConfigs(
665+
Collections.singletonList(
666+
new AlterConfigOp(
667+
DATALAKE_FORMAT.key(),
668+
null,
669+
AlterConfigOp.OpType.SET)))
670+
.get())
671+
.rootCause()
672+
.hasMessageContaining(
673+
String.format(
674+
"Principal %s have no authorization to operate ALTER_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
675+
guestPrincipal));
676+
677+
rootAdmin
678+
.createAcls(
679+
Collections.singletonList(
680+
new AclBinding(
681+
Resource.cluster(),
682+
new AccessControlEntry(
683+
guestPrincipal,
684+
"*",
685+
OperationType.ALTER_CONFIGS,
686+
PermissionType.ALLOW))))
687+
.all()
688+
.get();
689+
guestAdmin
690+
.alterConfigs(
691+
Collections.singletonList(
692+
new AlterConfigOp(
693+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET)))
694+
.get();
695+
696+
assertThatThrownBy(() -> guestAdmin.describeConfigs().get())
697+
.rootCause()
698+
.hasMessageContaining(
699+
String.format(
700+
"Principal %s have no authorization to operate DESCRIBE_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
701+
guestPrincipal));
702+
rootAdmin
703+
.createAcls(
704+
Collections.singletonList(
705+
new AclBinding(
706+
Resource.cluster(),
707+
new AccessControlEntry(
708+
guestPrincipal,
709+
"*",
710+
OperationType.DESCRIBE_CONFIGS,
711+
PermissionType.ALLOW))))
712+
.all()
713+
.get();
714+
Collection<ConfigEntry> configToResourceConfigs = guestAdmin.describeConfigs().get();
715+
assertThat(configToResourceConfigs)
716+
.contains(
717+
new ConfigEntry(
718+
DATALAKE_FORMAT.key(),
719+
null,
720+
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
721+
}
722+
656723
private static Configuration initConfig() {
657724
Configuration conf = new Configuration();
658725
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
@@ -661,7 +728,7 @@ private static Configuration initConfig() {
661728
// set a shorter max lag time to make tests in FlussFailServerTableITCase faster
662729
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(10));
663730
// set default datalake format for the cluster and enable datalake tables
664-
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
731+
conf.set(DATALAKE_FORMAT, DataLakeFormat.PAIMON);
665732

666733
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
667734
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));

0 commit comments

Comments
 (0)