diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 6a68d80d70e3c..0560697edc2af 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -39,7 +39,6 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -77,7 +76,6 @@ private Collection splitByTable() { private InventoryDumperContext createTableSpLitDumperContext(final CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier logicTableName) { InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext()); - // use original table name, for metadata loader, since some database table name case-sensitive result.setActualTableName(actualTableName.toString()); result.setLogicTableName(logicTableName.toString()); result.getCommonContext().setPosition(new IngestPlaceholderPosition()); @@ -124,7 +122,7 @@ private Collection getInventoryPositions(final InventoryDumperCo if (1 == uniqueKeyColumns.size()) { int firstColumnDataType = uniqueKeyColumns.get(0).getDataType(); if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) { - return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, sourceDataSource); + return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. @@ -134,13 +132,12 @@ private Collection getInventoryPositions(final InventoryDumperCo return Collections.singleton(new UnsupportedKeyIngestPosition()); } - private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, - final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { + private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) { if (0L == tableRecordsCount) { return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0L, 0L)); } Collection result = new LinkedList<>(); - Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext); + Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext); int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize(); long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0L ? 1 : 0); long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount; @@ -152,13 +149,13 @@ private Collection getPositionByIntegerUniqueKeyRange(final Inve return result; } - private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) { + private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final InventoryDumperContext dumperContext) { String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); try ( - Connection connection = dataSource.getConnection(); + Connection connection = sourceDataSource.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { resultSet.next();