Skip to content

Commit 8fe192e

Browse files
committed
[FIP-12] Add dynamic config and enable lakehouse dynamically.
1 parent 8a93663 commit 8fe192e

File tree

38 files changed

+1768
-82
lines changed

38 files changed

+1768
-82
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.alibaba.fluss.client.metadata.LakeSnapshot;
2424
import com.alibaba.fluss.cluster.ServerNode;
2525
import com.alibaba.fluss.config.ConfigOptions;
26+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
27+
import com.alibaba.fluss.config.dynamic.ConfigEntry;
2628
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2729
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2830
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -450,4 +452,19 @@ ListOffsetsResult listOffsets(
450452
* @return A CompletableFuture indicating completion of the operation.
451453
*/
452454
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
455+
456+
/**
457+
* Describe the configs of the cluster.
458+
*
459+
* @return A CompletableFuture containing the configs of the cluster.
460+
*/
461+
CompletableFuture<Collection<ConfigEntry>> describeConfigs();
462+
463+
/**
464+
* Alter the configs of the cluster.
465+
*
466+
* @param configs List of configs to alter.
467+
* @return A CompletableFuture indicating completion of the operation.
468+
*/
469+
CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
453470
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2525
import com.alibaba.fluss.cluster.Cluster;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
28+
import com.alibaba.fluss.config.dynamic.ConfigEntry;
2729
import com.alibaba.fluss.exception.LeaderNotAvailableException;
2830
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2931
import com.alibaba.fluss.metadata.DatabaseInfo;
@@ -41,11 +43,13 @@
4143
import com.alibaba.fluss.rpc.gateway.AdminGateway;
4244
import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
4345
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
46+
import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
4447
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
4548
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
4649
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
4750
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
4851
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
52+
import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
4953
import com.alibaba.fluss.rpc.messages.DropAclsRequest;
5054
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
5155
import com.alibaba.fluss.rpc.messages.DropTableRequest;
@@ -62,6 +66,8 @@
6266
import com.alibaba.fluss.rpc.messages.ListPartitionInfosRequest;
6367
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
6468
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
69+
import com.alibaba.fluss.rpc.messages.PbAlterConfigsRequestInfo;
70+
import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
6571
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
6672
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
6773
import com.alibaba.fluss.rpc.messages.PbTablePath;
@@ -81,6 +87,7 @@
8187
import java.util.List;
8288
import java.util.Map;
8389
import java.util.concurrent.CompletableFuture;
90+
import java.util.stream.Collectors;
8491

8592
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
8693
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -464,6 +471,65 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
464471
return result;
465472
}
466473

474+
@Override
475+
public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
476+
CompletableFuture<Collection<ConfigEntry>> future = new CompletableFuture<>();
477+
DescribeConfigsRequest request = new DescribeConfigsRequest();
478+
gateway.describeConfigs(request)
479+
.whenComplete(
480+
(r, t) -> {
481+
if (t != null) {
482+
future.completeExceptionally(t);
483+
}
484+
485+
List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList();
486+
List<ConfigEntry> configEntries =
487+
responseInfos.stream()
488+
.map(
489+
responseInfo ->
490+
new ConfigEntry(
491+
responseInfo.getConfigKey(),
492+
responseInfo.hasConfigValue()
493+
? responseInfo
494+
.getConfigValue()
495+
: null,
496+
ConfigEntry.ConfigSource
497+
.valueOf(
498+
responseInfo
499+
.getConfigSource())))
500+
.collect(Collectors.toList());
501+
future.complete(configEntries);
502+
});
503+
return future;
504+
}
505+
506+
@Override
507+
public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs) {
508+
CompletableFuture<Void> future = new CompletableFuture<>();
509+
510+
AlterConfigsRequest request = new AlterConfigsRequest();
511+
for (AlterConfigOp alterConfigOp : configs) {
512+
PbAlterConfigsRequestInfo requestInfo =
513+
request.addInfo()
514+
.setConfigKey(alterConfigOp.key())
515+
.setOpType(alterConfigOp.opType().id());
516+
if (alterConfigOp.value() != null) {
517+
requestInfo.setConfigValue(alterConfigOp.value());
518+
}
519+
}
520+
gateway.alterConfigs(request)
521+
.whenComplete(
522+
(r, t) -> {
523+
if (t != null) {
524+
future.completeExceptionally(t);
525+
}
526+
527+
future.complete(null);
528+
});
529+
530+
return future;
531+
}
532+
467533
@Override
468534
public void close() {
469535
// nothing to do yet

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
2828
import com.alibaba.fluss.config.ConfigOptions;
2929
import com.alibaba.fluss.config.Configuration;
30+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
31+
import com.alibaba.fluss.config.dynamic.ConfigEntry;
3032
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
3133
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
3234
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -45,7 +47,6 @@
4547
import com.alibaba.fluss.exception.TooManyPartitionsException;
4648
import com.alibaba.fluss.fs.FsPath;
4749
import com.alibaba.fluss.fs.FsPathAndFileName;
48-
import com.alibaba.fluss.metadata.DataLakeFormat;
4950
import com.alibaba.fluss.metadata.DatabaseDescriptor;
5051
import com.alibaba.fluss.metadata.DatabaseInfo;
5152
import com.alibaba.fluss.metadata.KvFormat;
@@ -69,6 +70,7 @@
6970
import java.time.LocalDate;
7071
import java.util.ArrayList;
7172
import java.util.Arrays;
73+
import java.util.Collection;
7274
import java.util.Collections;
7375
import java.util.HashMap;
7476
import java.util.List;
@@ -78,6 +80,8 @@
7880
import java.util.stream.Collectors;
7981
import java.util.stream.Stream;
8082

83+
import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
84+
import static com.alibaba.fluss.metadata.DataLakeFormat.PAIMON;
8185
import static com.alibaba.fluss.testutils.DataTestUtils.row;
8286
import static org.assertj.core.api.Assertions.assertThat;
8387
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -165,7 +169,7 @@ void testGetTableInfoAndSchema() throws Exception {
165169
.isEqualTo(
166170
DEFAULT_TABLE_DESCRIPTOR
167171
.withReplicationFactor(3)
168-
.withDataLakeFormat(DataLakeFormat.PAIMON));
172+
.withDataLakeFormat(PAIMON));
169173
assertThat(schemaInfo2).isEqualTo(schemaInfo);
170174
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
171175
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
@@ -190,7 +194,7 @@ void testGetTableInfoAndSchema() throws Exception {
190194
.isEqualTo(
191195
DEFAULT_TABLE_DESCRIPTOR
192196
.withReplicationFactor(3)
193-
.withDataLakeFormat(DataLakeFormat.PAIMON));
197+
.withDataLakeFormat(PAIMON));
194198
assertThat(schemaInfo2).isEqualTo(schemaInfo);
195199
// assert created time
196200
assertThat(tableInfo.getCreatedTime())
@@ -390,7 +394,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
390394
.isEqualTo(
391395
DEFAULT_TABLE_DESCRIPTOR
392396
.withReplicationFactor(3)
393-
.withDataLakeFormat(DataLakeFormat.PAIMON));
397+
.withDataLakeFormat(PAIMON));
394398
}
395399
}
396400

@@ -886,6 +890,55 @@ tablePath, newPartitionSpec("age", "11"), false)
886890
.isInstanceOf(TooManyPartitionsException.class);
887891
}
888892

893+
@Test
894+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
895+
assertThat(
896+
FLUSS_CLUSTER_EXTENSION
897+
.getCoordinatorServer()
898+
.getCoordinatorService()
899+
.getDataLakeFormat())
900+
.isEqualTo(PAIMON);
901+
902+
admin.alterConfigs(
903+
Collections.singletonList(
904+
new AlterConfigOp(
905+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET)))
906+
.get();
907+
assertThat(
908+
FLUSS_CLUSTER_EXTENSION
909+
.getCoordinatorServer()
910+
.getCoordinatorService()
911+
.getDataLakeFormat())
912+
.isNull();
913+
Collection<ConfigEntry> configToResourceConfigs = admin.describeConfigs().get();
914+
assertThat(configToResourceConfigs)
915+
.contains(
916+
new ConfigEntry(
917+
DATALAKE_FORMAT.key(),
918+
null,
919+
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
920+
921+
// Delete dynamic configs to use the initial value(from server.yaml)
922+
admin.alterConfigs(
923+
Collections.singletonList(
924+
new AlterConfigOp(
925+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE)))
926+
.get();
927+
assertThat(
928+
FLUSS_CLUSTER_EXTENSION
929+
.getCoordinatorServer()
930+
.getCoordinatorService()
931+
.getDataLakeFormat())
932+
.isEqualTo(PAIMON);
933+
configToResourceConfigs = admin.describeConfigs().get();
934+
assertThat(configToResourceConfigs)
935+
.contains(
936+
new ConfigEntry(
937+
DATALAKE_FORMAT.key(),
938+
"paimon",
939+
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
940+
}
941+
889942
private void assertNoBucketSnapshot(KvSnapshots snapshots, int expectBucketNum) {
890943
assertThat(snapshots.getBucketIds()).hasSize(expectBucketNum);
891944
for (int i = 0; i < expectBucketNum; i++) {

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.alibaba.fluss.config.ConfigOptions;
2929
import com.alibaba.fluss.config.Configuration;
3030
import com.alibaba.fluss.config.MemorySize;
31+
import com.alibaba.fluss.config.dynamic.AlterConfigOp;
32+
import com.alibaba.fluss.config.dynamic.ConfigEntry;
3133
import com.alibaba.fluss.exception.AuthorizationException;
3234
import com.alibaba.fluss.metadata.DataLakeFormat;
3335
import com.alibaba.fluss.metadata.DatabaseDescriptor;
@@ -69,6 +71,7 @@
6971
import java.util.List;
7072
import java.util.concurrent.ExecutionException;
7173

74+
import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
7275
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
7376
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
7477
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
@@ -653,6 +656,70 @@ void testProduceAndConsumer() throws Exception {
653656
}
654657
}
655658

659+
@Test
660+
void testDynamicConfigs() throws ExecutionException, InterruptedException {
661+
assertThatThrownBy(
662+
() ->
663+
guestAdmin
664+
.alterConfigs(
665+
Collections.singletonList(
666+
new AlterConfigOp(
667+
DATALAKE_FORMAT.key(),
668+
null,
669+
AlterConfigOp.OpType.SET)))
670+
.get())
671+
.rootCause()
672+
.hasMessageContaining(
673+
String.format(
674+
"Principal %s have no authorization to operate ALTER_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
675+
guestPrincipal));
676+
677+
rootAdmin
678+
.createAcls(
679+
Collections.singletonList(
680+
new AclBinding(
681+
Resource.cluster(),
682+
new AccessControlEntry(
683+
guestPrincipal,
684+
"*",
685+
OperationType.ALTER_CONFIGS,
686+
PermissionType.ALLOW))))
687+
.all()
688+
.get();
689+
guestAdmin
690+
.alterConfigs(
691+
Collections.singletonList(
692+
new AlterConfigOp(
693+
DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET)))
694+
.get();
695+
696+
assertThatThrownBy(() -> guestAdmin.describeConfigs().get())
697+
.rootCause()
698+
.hasMessageContaining(
699+
String.format(
700+
"Principal %s have no authorization to operate DESCRIBE_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
701+
guestPrincipal));
702+
rootAdmin
703+
.createAcls(
704+
Collections.singletonList(
705+
new AclBinding(
706+
Resource.cluster(),
707+
new AccessControlEntry(
708+
guestPrincipal,
709+
"*",
710+
OperationType.DESCRIBE_CONFIGS,
711+
PermissionType.ALLOW))))
712+
.all()
713+
.get();
714+
Collection<ConfigEntry> configToResourceConfigs = guestAdmin.describeConfigs().get();
715+
assertThat(configToResourceConfigs)
716+
.contains(
717+
new ConfigEntry(
718+
DATALAKE_FORMAT.key(),
719+
null,
720+
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
721+
}
722+
656723
private static Configuration initConfig() {
657724
Configuration conf = new Configuration();
658725
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);

0 commit comments

Comments
 (0)