Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove useless comment on InventoryDumperContextSplitter #32600

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -77,7 +76,6 @@ private Collection<InventoryDumperContext> splitByTable() {

private InventoryDumperContext createTableSpLitDumperContext(final CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier logicTableName) {
InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
result.setActualTableName(actualTableName.toString());
result.setLogicTableName(logicTableName.toString());
result.getCommonContext().setPosition(new IngestPlaceholderPosition());
Expand Down Expand Up @@ -124,7 +122,7 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, sourceDataSource);
return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
Expand All @@ -134,13 +132,12 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
return Collections.singleton(new UnsupportedKeyIngestPosition());
}

private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount,
final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) {
private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) {
if (0L == tableRecordsCount) {
return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0L, 0L));
}
Collection<IngestPosition> result = new LinkedList<>();
Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext);
int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0L ? 1 : 0);
long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
Expand All @@ -152,13 +149,13 @@ private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final Inve
return result;
}

private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) {
private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final InventoryDumperContext dumperContext) {
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Connection connection = sourceDataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
resultSet.next();
Expand Down