Skip to content

Commit

Permalink
Add DataConsistencyCheckAlgorithmInfoRegistry (#29084)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 19, 2023
1 parent a9eadde commit a76d0d7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.pojo;

import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;

import java.util.Collection;
import java.util.LinkedList;
import java.util.stream.Collectors;

/**
* Data consistency check algorithm info registry.
*/
@NoArgsConstructor
public final class DataConsistencyCheckAlgorithmInfoRegistry {

private static final Collection<DataConsistencyCheckAlgorithmInfo> ALGORITHM_INFOS = loadAllAlgorithms();

private static Collection<DataConsistencyCheckAlgorithmInfo> loadAllAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new LinkedList<>();
for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) {
SPIDescription description = each.getClass().getAnnotation(SPIDescription.class);
String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
result.add(
new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value()));
}
return result;
}

private static Collection<DatabaseType> getSupportedDatabaseTypes(final Collection<DatabaseType> supportedDatabaseTypes) {
return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes;
}

/**
* Get all data consistency check algorithm infos.
*
* @return all data consistency check algorithm infos
*/
public static Collection<DataConsistencyCheckAlgorithmInfo> getAllAlgorithmInfos() {
return ALGORITHM_INFOS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,20 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
Expand Down Expand Up @@ -89,11 +82,9 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte
* @return job item infos
*/
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId);
PipelineJobConfiguration jobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(jobId);
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig);
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobItemManager<InventoryIncrementalJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
Expand Down Expand Up @@ -155,25 +146,6 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo());
}

/**
* List all data consistency check algorithms from SPI.
*
* @return data consistency check algorithms
*/
public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new LinkedList<>();
for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) {
SPIDescription description = each.getClass().getAnnotation(SPIDescription.class);
String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value()));
}
return result;
}

private Collection<DatabaseType> getSupportedDatabaseTypes(final Collection<DatabaseType> supportedDatabaseTypes) {
return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes;
}

/**
* Aggregate data consistency check results.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfoRegistry;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;

import java.util.Arrays;
Expand All @@ -37,8 +34,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor implements QueryableRALE

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationCheckAlgorithmsStatement sqlStatement) {
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
return inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
return DataConsistencyCheckAlgorithmInfoRegistry.getAllAlgorithmInfos().stream().map(
each -> new LocalDataQueryResultRow(each.getType(), each.getTypeAliases(),
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")), each.getDescription()))
.collect(Collectors.toList());
Expand Down

0 comments on commit a76d0d7

Please sign in to comment.