From 7bf247dd29e168b55cfd7c623261ccf8fa0a283d Mon Sep 17 00:00:00 2001 From: zhangliang Date: Mon, 19 Aug 2024 21:43:58 +0800 Subject: [PATCH 1/2] Refactor InventoryDumperContextSplitter --- .../splitter/InventoryDumperContextSplitter.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 6a68d80d70e3c..7b64da57f4d41 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -39,7 +39,6 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -124,7 +123,7 @@ private Collection getInventoryPositions(final InventoryDumperCo if (1 == uniqueKeyColumns.size()) { int firstColumnDataType = uniqueKeyColumns.get(0).getDataType(); if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) { - return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, sourceDataSource); + return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. @@ -134,13 +133,12 @@ private Collection getInventoryPositions(final InventoryDumperCo return Collections.singleton(new UnsupportedKeyIngestPosition()); } - private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, - final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { + private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) { if (0L == tableRecordsCount) { return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0L, 0L)); } Collection result = new LinkedList<>(); - Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext); + Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext); int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize(); long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0L ? 1 : 0); long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount; @@ -152,13 +150,13 @@ private Collection getPositionByIntegerUniqueKeyRange(final Inve return result; } - private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) { + private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final InventoryDumperContext dumperContext) { String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); try ( - Connection connection = dataSource.getConnection(); + Connection connection = sourceDataSource.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { resultSet.next(); From 3bdad8534c4b5225692f1d82500b72ff4906f430 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 20 Aug 2024 10:46:40 +0800 Subject: [PATCH 2/2] Remove useless comment on InventoryDumperContextSplitter --- .../inventory/splitter/InventoryDumperContextSplitter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 7b64da57f4d41..0560697edc2af 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -76,7 +76,6 @@ private Collection splitByTable() { private InventoryDumperContext createTableSpLitDumperContext(final CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier logicTableName) { InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext()); - // use original table name, for metadata loader, since some database table name case-sensitive result.setActualTableName(actualTableName.toString()); result.setLogicTableName(logicTableName.toString()); result.getCommonContext().setPosition(new IngestPlaceholderPosition());