diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java index 8ae9cdba714a..533be4255cb6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeITFramework.java @@ -29,6 +29,8 @@ import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.itbase.exception.InconsistentDataException; +import org.apache.iotdb.jdbc.IoTDBSQLException; +import org.apache.iotdb.relational.it.query.old.aligned.TableUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -42,6 +44,7 @@ import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.HashSet; import java.util.List; @@ -57,7 +60,7 @@ public class IoTDBRemoveDataNodeITFramework { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBRemoveDataNodeITFramework.class); - private static final String INSERTION1 = + private static final String TREE_MODEL_INSERTION = "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; private static final String SHOW_REGIONS = "show regions"; @@ -66,6 +69,10 @@ public class IoTDBRemoveDataNodeITFramework { private static final String defaultSchemaRegionGroupExtensionPolicy = "CUSTOM"; private static final String defaultDataRegionGroupExtensionPolicy = "CUSTOM"; + public static final int NOT_USE_SQL = 0; + public static final int TREE_MODEL_SQL = 1; + public static final int TABLE_MODEL_SQL = 3; + @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -90,7 +97,8 @@ public void successTest( final int dataNodeNum, final int removeDataNodeNum, final int dataRegionPerDataNode, - final boolean rejoinRemovedDataNode) + final boolean rejoinRemovedDataNode, + final int SQLType) throws Exception { testRemoveDataNode( dataReplicateFactor, @@ -100,7 +108,8 @@ public void successTest( removeDataNodeNum, dataRegionPerDataNode, true, - rejoinRemovedDataNode); + rejoinRemovedDataNode, + SQLType); } public void failTest( @@ -110,7 +119,8 @@ public void failTest( final int dataNodeNum, final int removeDataNodeNum, final int dataRegionPerDataNode, - final boolean rejoinRemovedDataNode) + final boolean rejoinRemovedDataNode, + final int SQLType) throws Exception { testRemoveDataNode( dataReplicateFactor, @@ -120,7 +130,8 @@ public void failTest( removeDataNodeNum, dataRegionPerDataNode, false, - rejoinRemovedDataNode); + rejoinRemovedDataNode, + SQLType); } public void testRemoveDataNode( @@ -131,7 +142,8 @@ public void testRemoveDataNode( final int removeDataNodeNum, final int dataRegionPerDataNode, final boolean expectRemoveSuccess, - final boolean rejoinRemovedDataNode) + final boolean rejoinRemovedDataNode, + final int SQLType) throws Exception { // Set up the environment EnvFactory.getEnv() @@ -143,13 +155,18 @@ public void testRemoveDataNode( dataRegionPerDataNode * dataNodeNum / dataReplicateFactor); EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum); - try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + try (final Connection connection = makeItCloseQuietly(getConnectionWithSQLType(SQLType)); final Statement statement = makeItCloseQuietly(connection.createStatement()); SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - // Insert data - statement.execute(INSERTION1); + if (SQLType == TABLE_MODEL_SQL) { + // Insert data in table model + TableUtils.insertData(); + } else { + // Insert data in tree model + statement.execute(TREE_MODEL_INSERTION); + } Map> regionMap = getDataRegionMap(statement); regionMap.forEach( @@ -187,21 +204,38 @@ public void testRemoveDataNode( .map(TDataNodeConfiguration::getLocation) .filter(location -> removeDataNodes.contains(location.getDataNodeId())) .collect(Collectors.toList()); - TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations); - - // Remove data nodes - TDataNodeRemoveResp removeResp = clientRef.get().removeDataNode(removeReq); - LOGGER.info("Submit Remove DataNodes result {} ", removeResp); - if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (expectRemoveSuccess) { - LOGGER.error("Submit Remove DataNodes fail"); - Assert.fail(); - } else { - LOGGER.info("Submit Remove DataNodes fail, as expected"); - return; + if (SQLType != NOT_USE_SQL) { + String removeDataNodeSQL = generateRemoveString(removeDataNodes); + LOGGER.info("Remove DataNodes SQL: {}", removeDataNodeSQL); + try { + statement.execute(removeDataNodeSQL); + } catch (IoTDBSQLException e) { + if (expectRemoveSuccess) { + LOGGER.error("Remove DataNodes SQL execute fail: {}", e.getMessage()); + Assert.fail(); + } else { + LOGGER.info("Submit Remove DataNodes fail, as expected"); + return; + } + } + LOGGER.info("Remove DataNodes SQL submit successfully."); + } else { + TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations); + + // Remove data nodes + TDataNodeRemoveResp removeResp = clientRef.get().removeDataNode(removeReq); + LOGGER.info("Submit Remove DataNodes result {} ", removeResp); + if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (expectRemoveSuccess) { + LOGGER.error("Submit Remove DataNodes fail"); + Assert.fail(); + } else { + LOGGER.info("Submit Remove DataNodes fail, as expected."); + return; + } } + LOGGER.info("Submit Remove DataNodes request: {}", removeReq); } - LOGGER.info("Submit Remove DataNodes request: {}", removeReq); // Wait until success boolean removeSuccess = false; @@ -363,4 +397,24 @@ public void restartDataNodes(List dataNodeWrappers) { LOGGER.info("Node {} restarted.", nodeWrapper.getId()); }); } + + public static String generateRemoveString(Set dataNodes) { + StringBuilder sb = new StringBuilder("remove datanode "); + + for (Integer node : dataNodes) { + sb.append(node).append(", "); + } + + sb.setLength(sb.length() - 2); + + return sb.toString(); + } + + public Connection getConnectionWithSQLType(int SQLType) throws SQLException { + if (SQLType == TABLE_MODEL_SQL) { + return EnvFactory.getEnv().getTableConnection(); + } else { + return EnvFactory.getEnv().getConnection(); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java index f091499bf02f..48a147245b5d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java @@ -29,13 +29,34 @@ @Category({ClusterIT.class}) @RunWith(IoTDBTestRunner.class) public class IoTDBRemoveDataNodeNormalIT extends IoTDBRemoveDataNodeITFramework { + @Test public void success1C4DTest() throws Exception { - successTest(2, 3, 1, 4, 1, 2, true); + successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.NOT_USE_SQL); } @Test public void fail1C3DTest() throws Exception { - failTest(2, 3, 1, 3, 1, 2, false); + failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.NOT_USE_SQL); + } + + @Test + public void success1C4DTestUseSQL() throws Exception { + successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.TREE_MODEL_SQL); + } + + @Test + public void fail1C3DTestUseSQL() throws Exception { + failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.TREE_MODEL_SQL); + } + + @Test + public void success1C4DTestUseTableSQL() throws Exception { + successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.TABLE_MODEL_SQL); + } + + @Test + public void fail1C3DTestUseTableSQL() throws Exception { + failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.TABLE_MODEL_SQL); } } diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index bb528c001e26..63ff0c7afa49 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -78,6 +78,7 @@ keyWords | DATA_REGION_GROUP_NUM | DATABASE | DATABASES + | DATANODE | DATANODEID | DATANODES | DATASET diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 13d9cc6708dc..5e7b05378256 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -63,7 +63,7 @@ ddlStatement // Cluster | showVariables | showCluster | showRegions | showDataNodes | showConfigNodes | showClusterId | getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList - | migrateRegion | reconstructRegion | extendRegion | removeRegion + | migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode | verifyConnection // AINode | showAINodes | createModel | dropModel | showModels | callInference @@ -550,6 +550,11 @@ verifyConnection : VERIFY CONNECTION (DETAILS)? ; +// ---- Remove DataNode +removeDataNode + : REMOVE DATANODE dataNodeId=INTEGER_LITERAL (COMMA dataNodeId=INTEGER_LITERAL)* + ; + // Pipe Task ========================================================================================= createPipe : CREATE PIPE (IF NOT EXISTS)? pipeName=identifier diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 50f957c7fcc2..0a3453cfc541 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -217,6 +217,10 @@ DATABASES : D A T A B A S E S ; +DATANODE + : D A T A N O D E + ; + DATANODEID : D A T A N O D E I D ; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 5934f25aec89..9356acc3c47c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -72,6 +72,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.KillQuery; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetConfiguration; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetProperties; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowAINodes; @@ -407,6 +408,7 @@ private IQueryExecution createQueryExecutionForTableModel( || statement instanceof StartRepairData || statement instanceof StopRepairData || statement instanceof PipeStatement + || statement instanceof RemoveDataNode || statement instanceof SubscriptionStatement || statement instanceof ShowCurrentSqlDialect || statement instanceof ShowCurrentUser diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 64935c4878a8..86b355f929b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask; @@ -120,6 +121,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetConfiguration; @@ -151,6 +153,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewrite; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement; @@ -351,6 +354,18 @@ protected IConfigTask visitShowRegions( return new ShowRegionTask(treeStatement, true); } + @Override + protected IConfigTask visitRemoveDataNode( + final RemoveDataNode removeDataNode, final MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + accessControl.checkUserHasMaintainPrivilege(context.getSession().getUserName()); + // As the implementation is identical, we'll simply translate to the + // corresponding tree-model variant and execute that. + final RemoveDataNodeStatement treeStatement = + new RemoveDataNodeStatement(removeDataNode.getNodeIds()); + return new RemoveDataNodeTask(treeStatement); + } + @Override protected IConfigTask visitShowDataNodes( final ShowDataNodes showDataNodesStatement, final MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 4af3e519d8ca..7ec00aacf58d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionIdTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.SetTTLTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowAINodesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterDetailsTask; @@ -117,6 +118,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; @@ -652,6 +654,12 @@ public IConfigTask visitRemoveRegion( return new RemoveRegionTask(removeRegionStatement); } + @Override + public IConfigTask visitRemoveDataNode( + RemoveDataNodeStatement removeDataNodeStatement, MPPQueryContext context) { + return new RemoveDataNodeTask(removeDataNodeStatement); + } + @Override public IConfigTask visitCreateContinuousQuery( CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 5ca87f6b1cbb..4a70c5991c98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -21,6 +21,8 @@ import org.apache.iotdb.common.rpc.thrift.FunctionType; import org.apache.iotdb.common.rpc.thrift.Model; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -80,6 +82,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; @@ -221,6 +225,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; @@ -316,6 +321,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -2797,6 +2803,70 @@ public SettableFuture migrateRegion( return future; } + @Override + public SettableFuture removeDataNode( + final RemoveDataNodeStatement removeDataNodeStatement) { + final SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + Set nodeIds = removeDataNodeStatement.getNodeIds(); + + Set validNodeIds = + configNodeClient.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().keySet(); + + Set invalidNodeIds = new HashSet<>(nodeIds); + invalidNodeIds.removeAll(validNodeIds); + + if (!invalidNodeIds.isEmpty()) { + LOGGER.info("Cannot remove invalid nodeIds:{}", invalidNodeIds); + nodeIds.removeAll(invalidNodeIds); + } + + LOGGER.info("Starting to remove DataNode with nodeIds: {}", nodeIds); + + final Set finalNodeIds = nodeIds; + List removeDataNodeLocations = + configNodeClient + .getDataNodeConfiguration(-1) + .getDataNodeConfigurationMap() + .values() + .stream() + .map(TDataNodeConfiguration::getLocation) + .filter(location -> finalNodeIds.contains(location.getDataNodeId())) + .collect(Collectors.toList()); + + List simplifiedLocations = new ArrayList<>(); + for (TDataNodeLocation dataNodeLocation : removeDataNodeLocations) { + simplifiedLocations.add( + dataNodeLocation.getDataNodeId() + + "@" + + dataNodeLocation.getInternalEndPoint().getIp()); + } + + LOGGER.info("Start to remove datanode, removed DataNodes endpoint: {}", simplifiedLocations); + TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations); + TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq); + LOGGER.info("Submit Remove DataNodes result {} ", removeResp); + if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + removeResp.getStatus().toString(), removeResp.getStatus().getCode())); + return future; + } else { + LOGGER.info( + "Submit remove-datanode request successfully, but the process may fail. " + + "more details are shown in the logs of confignode-leader and removed-datanode, " + + "and after the process of removing datanode ends successfully, " + + "you are supposed to delete directory and data of the removed-datanode manually"); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (Exception e) { + future.setException(e); + } + + return future; + } + @Override public SettableFuture reconstructRegion( ReconstructRegionStatement reconstructRegionStatement) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index d3f3955d0049..d34ae66ac16e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; @@ -261,6 +262,8 @@ SettableFuture reconstructRegion( SettableFuture removeRegion(RemoveRegionStatement removeRegionStatement); + SettableFuture removeDataNode(RemoveDataNodeStatement removeDataNodeStatement); + SettableFuture createContinuousQuery( CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveDataNodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveDataNodeTask.java new file mode 100644 index 000000000000..5a91da21f628 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveDataNodeTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; + +import com.google.common.util.concurrent.ListenableFuture; + +public class RemoveDataNodeTask implements IConfigTask { + + protected final RemoveDataNodeStatement statement; + + public RemoveDataNodeTask(RemoveDataNodeStatement removeDataNodeStatement) { + this.statement = removeDataNodeStatement; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) { + // If the action is executed successfully, return the Future. + // If your operation is async, you can return the corresponding future directly. + return configTaskExecutor.removeDataNode(statement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 7ff1df840483..d60faf12d3f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -150,6 +150,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildNodesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildPathsStatement; @@ -233,6 +234,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.io.BaseEncoding; import org.antlr.v4.runtime.Token; +import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.TerminalNode; import org.apache.commons.lang3.StringUtils; import org.apache.tsfile.common.conf.TSFileDescriptor; @@ -4211,6 +4213,16 @@ public Statement visitRemoveRegion(IoTDBSqlParser.RemoveRegionContext ctx) { Integer.parseInt(ctx.regionId.getText()), Integer.parseInt(ctx.targetDataNodeId.getText())); } + @Override + public Statement visitRemoveDataNode(IoTDBSqlParser.RemoveDataNodeContext ctx) { + List dataNodeIDs = + ctx.INTEGER_LITERAL().stream() + .map(ParseTree::getText) + .map(Integer::parseInt) + .collect(Collectors.toList()); + return new RemoveDataNodeStatement(dataNodeIDs); + } + @Override public Statement visitVerifyConnection(IoTDBSqlParser.VerifyConnectionContext ctx) { return new TestConnectionStatement(ctx.DETAILS() != null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 8d3e8aa6f2dd..3d52bb87d163 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -373,6 +373,10 @@ protected R visitRenameTable(RenameTable node, C context) { return visitStatement(node, context); } + protected R visitRemoveDataNode(RemoveDataNode node, C context) { + return visitStatement(node, context); + } + protected R visitDescribeTable(DescribeTable node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveDataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveDataNode.java new file mode 100644 index 000000000000..7e0140137a5b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveDataNode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class RemoveDataNode extends Statement { + final List nodeIds; + + public RemoveDataNode(List dataNodeIDs) { + super(null); + this.nodeIds = dataNodeIDs; + } + + public List getNodeIds() { + return nodeIds; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitRemoveDataNode(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public int hashCode() { + return Objects.hash(nodeIds); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + RemoveDataNode other = (RemoveDataNode) obj; + return Objects.equals(nodeIds, other.nodeIds); + } + + @Override + public String toString() { + return toStringHelper(this).add("nodeIds", nodeIds).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index ad666e18800b..1cca371cc2c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -124,6 +124,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QueryBody; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuerySpecification; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row; @@ -224,6 +225,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; @@ -1139,6 +1141,16 @@ public Node visitMigrateRegionStatement(RelationalSqlParser.MigrateRegionStateme return super.visitMigrateRegionStatement(ctx); } + @Override + public Node visitRemoveDataNodeStatement(RelationalSqlParser.RemoveDataNodeStatementContext ctx) { + List nodeIds = + ctx.INTEGER_VALUE().stream() + .map(TerminalNode::getText) + .map(Integer::valueOf) + .collect(Collectors.toList()); + return new RemoveDataNode(nodeIds); + } + @Override public Node visitFlushStatement(final RelationalSqlParser.FlushStatementContext ctx) { final FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 7f4181be8b35..333d21aeccd4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildNodesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowChildPathsStatement; @@ -593,6 +594,10 @@ public R visitRemoveRegion(RemoveRegionStatement removeRegionStatement, C contex return visitStatement(removeRegionStatement, context); } + public R visitRemoveDataNode(RemoveDataNodeStatement removeDataNodeStatement, C context) { + return visitStatement(removeDataNodeStatement, context); + } + public R visitDeactivateTemplate( DeactivateTemplateStatement deactivateTemplateStatement, C context) { return visitStatement(deactivateTemplateStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveDataNodeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveDataNodeStatement.java new file mode 100644 index 000000000000..44a6ddc6907d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveDataNodeStatement.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class RemoveDataNodeStatement extends Statement implements IConfigStatement { + + final Set nodeIds; + + public RemoveDataNodeStatement(List dataNodeIDs) { + super(); + this.nodeIds = new HashSet<>(dataNodeIDs); + } + + public Set getNodeIds() { + return nodeIds; + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitRemoveDataNode(this, context); + } + + @Override + public QueryType getQueryType() { + return QueryType.WRITE; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 8a9a76e59f82..16f0ab9956c9 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -108,6 +108,7 @@ statement | countTimeSlotListStatement | showSeriesSlotListStatement | migrateRegionStatement + | removeDataNodeStatement // Admin Statement | showVariablesStatement @@ -465,7 +466,9 @@ migrateRegionStatement : MIGRATE REGION regionId=INTEGER_VALUE FROM fromId=INTEGER_VALUE TO toId=INTEGER_VALUE ; - +removeDataNodeStatement + : REMOVE DATANODE dataNodeId=INTEGER_VALUE (',' dataNodeId=INTEGER_VALUE)* + ; // ------------------------------------------- Admin Statement --------------------------------------------------------- showVariablesStatement @@ -939,7 +942,7 @@ nonReserved : ABSENT | ADD | ADMIN | AFTER | ALL | ANALYZE | ANY | ARRAY | ASC | AT | ATTRIBUTE | AUTHORIZATION | BEGIN | BERNOULLI | BOTH | CACHE | CALL | CALLED | CASCADE | CATALOG | CATALOGS | CHAR | CHARACTER | CHARSET | CLEAR | CLUSTER | CLUSTERID | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITION | CONDITIONAL | CONFIGNODES | CONFIGURATION | CONNECTOR | CONSTANT | COPARTITION | COUNT | CURRENT - | DATA | DATABASE | DATABASES | DATANODES | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISTRIBUTED | DO | DOUBLE + | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISTRIBUTED | DO | DOUBLE | ELSEIF | EMPTY | ENCODING | ERROR | EXCLUDING | EXPLAIN | EXTRACTOR | FETCH | FIELD | FILTER | FINAL | FIRST | FLUSH | FOLLOWING | FORMAT | FUNCTION | FUNCTIONS | GRACE | GRANT | GRANTED | GRANTS | GRAPHVIZ | GROUPS @@ -953,7 +956,7 @@ nonReserved | OBJECT | OF | OFFSET | OMIT | ONE | ONLY | OPTION | ORDINALITY | OUTPUT | OVER | OVERFLOW | PARTITION | PARTITIONS | PASSING | PAST | PATH | PATTERN | PER | PERIOD | PERMUTE | PIPE | PIPEPLUGIN | PIPEPLUGINS | PIPES | PLAN | POSITION | PRECEDING | PRECISION | PRIVILEGES | PREVIOUS | PROCESSLIST | PROCESSOR | PROPERTIES | PRUNE | QUERIES | QUERY | QUOTES - | RANGE | READ | READONLY | REFRESH | REGION | REGIONID | REGIONS | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | RUNNING + | RANGE | READ | READONLY | REFRESH | REGION | REGIONID | REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | RUNNING | SERIESSLOTID | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | SERIALIZABLE | SESSION | SET | SETS | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTIONS | SUBSET | SUBSTRING | SYSTEM | TABLES | TABLESAMPLE | TAG | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE @@ -1031,6 +1034,7 @@ CURRENT_USER: 'CURRENT_USER'; DATA: 'DATA'; DATABASE: 'DATABASE'; DATABASES: 'DATABASES'; +DATANODE: 'DATANODE'; DATANODES: 'DATANODES'; DATE: 'DATE'; DATE_BIN: 'DATE_BIN'; @@ -1224,6 +1228,7 @@ REFRESH: 'REFRESH'; REGION: 'REGION'; REGIONID: 'REGIONID'; REGIONS: 'REGIONS'; +REMOVE: 'REMOVE'; RENAME: 'RENAME'; REPAIR: 'REPAIR'; REPEAT: 'REPEAT';