Skip to content

Commit

Permalink
Refactor PipelineJobManager.getJobConfiguration() (#29083)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 19, 2023
1 parent 4ee38f5 commit a9eadde
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
*/
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId);
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public final class PipelineJobManager {
/**
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
* @param jobId job ID
* @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
Expand Down Expand Up @@ -49,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

Expand All @@ -305,9 +304,8 @@ public void commit(final String jobId) {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
Expand Down Expand Up @@ -82,7 +81,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
return jobManager.<CDCJobConfiguration>getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
return jobManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
}

/**
Expand Down Expand Up @@ -130,7 +129,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result
}

private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) {
ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(checkJobId);
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void runBlocking() {
jobItemManager.persistProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
Expand Down Expand Up @@ -211,11 +210,10 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
List<String> sourceTables = new LinkedList<>();
new PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
new PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables));
}

Expand Down Expand Up @@ -324,7 +322,7 @@ private void dropCheckJobs(final String jobId) {
}

private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
Expand All @@ -348,7 +346,7 @@ public void commit(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void assertCreateJobConfig() {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void assertStartOrStopById() {
void assertRollback() throws SQLException {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand All @@ -162,7 +162,7 @@ void assertRollback() throws SQLException {
void assertCommit() {
Optional<String> jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
Expand Down Expand Up @@ -285,7 +285,7 @@ void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db"));
MigrationJobConfiguration actual = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration actual = jobManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));
Expand Down

0 comments on commit a9eadde

Please sign in to comment.