Skip to content

Commit

Permalink
Refactor rule item changed subscriber (#26763)
Browse files Browse the repository at this point in the history
* Refactor SingleTableSubscriber

* Refactor EncryptTableSubscriber

* Refactor EncryptTableSubscriber

* Refactor RuleItemChangedSubscribeEngine

* Refactor EncryptorSubscriber

* Refactor CompatibleEncryptorSubscriber and CompatibleEncryptTableSubscriber

* Refactor MaskAlgorithmSubscriber and MaskTableSubscriber

* Refactor ReadwriteSplittingLoadBalancerSubscriber

* Refactor ReadwriteSplittingDataSourceSubscriber

* Refactor BroadcastTableSubscriber

* Refactor DefaultStrategySubscribeEngine

* Refactor DefaultShardingAuditorStrategySubscriber

* Refactor DefaultShardingColumnSubscriber

* Refactor KeyGeneratorSubscriber

* Refactor ShardingAuditorSubscriber

* Refactor ShardingAutoTableSubscriber

* Refactor ShardingCacheSubscriber

* Refactor ShardingTableSubscriber

* Refactor ShardingTableReferenceSubscriber

* Refactor ShadowTableSubscriber

* Refactor ShadowDataSourceSubscriber

* Refactor ShadowAlgorithmSubscriber

* Refactor DefaultShadowAlgorithmNameSubscriber
  • Loading branch information
terrymanu committed Jul 4, 2023
1 parent 71c24ee commit e7284bf
Show file tree
Hide file tree
Showing 54 changed files with 1,706 additions and 834 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.broadcast.subscriber;

import org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.broadcast.yaml.config.YamlBroadcastRuleConfiguration;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.event.rule.alter.AlterRuleItemEvent;
import org.apache.shardingsphere.infra.rule.event.rule.drop.DropRuleItemEvent;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscribeEngine;

import java.util.LinkedList;

/**
* Broadcast table subscribe engine.
*/
public final class BroadcastTableSubscribeEngine extends RuleItemChangedSubscribeEngine<BroadcastRuleConfiguration, BroadcastRuleConfiguration> {

public BroadcastTableSubscribeEngine(final ContextManager contextManager) {
super(contextManager);
}

@Override
protected BroadcastRuleConfiguration swapRuleItemConfigurationFromEvent(final AlterRuleItemEvent event, final String yamlContent) {
return new BroadcastRuleConfiguration(YamlEngine.unmarshal(yamlContent, YamlBroadcastRuleConfiguration.class).getTables());
}

@Override
protected BroadcastRuleConfiguration findRuleConfiguration(final ShardingSphereDatabase database) {
return database.getRuleMetaData().findSingleRule(BroadcastRule.class).map(BroadcastRule::getConfiguration).orElseGet(() -> new BroadcastRuleConfiguration(new LinkedList<>()));
}

@Override
protected void changeRuleItemConfiguration(final AlterRuleItemEvent event, final BroadcastRuleConfiguration currentRuleConfig, final BroadcastRuleConfiguration toBeChangedItemConfig) {
currentRuleConfig.getTables().clear();
currentRuleConfig.getTables().addAll(toBeChangedItemConfig.getTables());
}

@Override
protected void dropRuleItemConfiguration(final DropRuleItemEvent event, final BroadcastRuleConfiguration currentRuleConfig) {
currentRuleConfig.getTables().clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,33 @@
package org.apache.shardingsphere.broadcast.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.Setter;
import org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.broadcast.event.table.AlterBroadcastTableEvent;
import org.apache.shardingsphere.broadcast.event.table.DropBroadcastTableEvent;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.broadcast.yaml.config.YamlBroadcastRuleConfiguration;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.config.DatabaseRuleConfigurationChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.subsciber.RuleChangedSubscriber;

import java.util.Optional;

/**
* Broadcast table subscriber.
*/
@SuppressWarnings("UnstableApiUsage")
@Setter
public final class BroadcastTableSubscriber implements RuleChangedSubscriber<AlterBroadcastTableEvent, DropBroadcastTableEvent> {

private ContextManager contextManager;
private BroadcastTableSubscribeEngine engine;

@Override
public void setContextManager(final ContextManager contextManager) {
engine = new BroadcastTableSubscribeEngine(contextManager);
}

@Subscribe
@Override
public synchronized void renew(final AlterBroadcastTableEvent event) {
if (!event.getActiveVersion().equals(contextManager.getInstanceContext().getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
return;
}
String yamlContent = contextManager.getInstanceContext().getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion());
BroadcastRuleConfiguration toBeChangedConfig = new BroadcastRuleConfiguration(YamlEngine.unmarshal(yamlContent, YamlBroadcastRuleConfiguration.class).getTables());
ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(event.getDatabaseName());
Optional<BroadcastRule> rule = database.getRuleMetaData().findSingleRule(BroadcastRule.class);
BroadcastRuleConfiguration config;
if (rule.isPresent()) {
config = rule.get().getConfiguration();
config.getTables().clear();
config.getTables().addAll(toBeChangedConfig.getTables());
} else {
config = new BroadcastRuleConfiguration(toBeChangedConfig.getTables());
}
contextManager.getInstanceContext().getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
engine.renew(event);
}

@Subscribe
@Override
public synchronized void renew(final DropBroadcastTableEvent event) {
if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getDatabaseName())) {
return;
}
ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(event.getDatabaseName());
BroadcastRuleConfiguration config = database.getRuleMetaData().getSingleRule(BroadcastRule.class).getConfiguration();
config.getTables().clear();
contextManager.getInstanceContext().getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
engine.renew(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public EncryptTableSubscribeEngine(final ContextManager contextManager) {
}

@Override
protected EncryptTableRuleConfiguration swapRuleItemConfigurationFromEvent(final String yamlContent) {
protected EncryptTableRuleConfiguration swapRuleItemConfigurationFromEvent(final AlterRuleItemEvent event, final String yamlContent) {
return new YamlEncryptTableRuleConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContent, YamlEncryptTableRuleConfiguration.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public EncryptorSubscribeEngine(final ContextManager contextManager) {
}

@Override
protected AlgorithmConfiguration swapRuleItemConfigurationFromEvent(final String yamlContent) {
protected AlgorithmConfiguration swapRuleItemConfigurationFromEvent(final AlterRuleItemEvent event, final String yamlContent) {
return new YamlAlgorithmConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContent, YamlAlgorithmConfiguration.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.encrypt.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.Setter;
import org.apache.shardingsphere.encrypt.event.encryptor.AlterEncryptorEvent;
import org.apache.shardingsphere.encrypt.event.encryptor.DropEncryptorEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand All @@ -28,7 +27,6 @@
* Encryptor subscriber.
*/
@SuppressWarnings("UnstableApiUsage")
@Setter
public final class EncryptorSubscriber implements RuleChangedSubscriber<AlterEncryptorEvent, DropEncryptorEvent> {

private EncryptorSubscribeEngine engine;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.encrypt.subscriber.compatible;

import org.apache.shardingsphere.encrypt.api.config.CompatibleEncryptRuleConfiguration;
import org.apache.shardingsphere.encrypt.api.config.rule.EncryptTableRuleConfiguration;
import org.apache.shardingsphere.encrypt.rule.EncryptRule;
import org.apache.shardingsphere.encrypt.yaml.config.rule.YamlEncryptTableRuleConfiguration;
import org.apache.shardingsphere.encrypt.yaml.swapper.rule.YamlEncryptTableRuleConfigurationSwapper;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.event.rule.alter.AlterRuleItemEvent;
import org.apache.shardingsphere.infra.rule.event.rule.drop.DropNamedRuleItemEvent;
import org.apache.shardingsphere.infra.rule.event.rule.drop.DropRuleItemEvent;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscribeEngine;

import java.util.LinkedHashMap;
import java.util.LinkedList;

/**
* Compatible encrypt table subscribe engine.
* @deprecated compatible support will remove in next version.
*/
@Deprecated
public final class CompatibleEncryptTableSubscribeEngine extends RuleItemChangedSubscribeEngine<CompatibleEncryptRuleConfiguration, EncryptTableRuleConfiguration> {

public CompatibleEncryptTableSubscribeEngine(final ContextManager contextManager) {
super(contextManager);
}

@Override
protected EncryptTableRuleConfiguration swapRuleItemConfigurationFromEvent(final AlterRuleItemEvent event, final String yamlContent) {
return new YamlEncryptTableRuleConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContent, YamlEncryptTableRuleConfiguration.class));
}

@Override
protected CompatibleEncryptRuleConfiguration findRuleConfiguration(final ShardingSphereDatabase database) {
return database.getRuleMetaData().findSingleRule(EncryptRule.class)
.map(optional -> getCompatibleEncryptRuleConfiguration((CompatibleEncryptRuleConfiguration) optional.getConfiguration()))
.orElseGet(() -> new CompatibleEncryptRuleConfiguration(new LinkedList<>(), new LinkedHashMap<>()));
}

private CompatibleEncryptRuleConfiguration getCompatibleEncryptRuleConfiguration(final CompatibleEncryptRuleConfiguration config) {
return null == config.getTables() ? new CompatibleEncryptRuleConfiguration(new LinkedList<>(), config.getEncryptors()) : config;
}

@Override
protected void changeRuleItemConfiguration(final AlterRuleItemEvent event, final CompatibleEncryptRuleConfiguration currentRuleConfig, final EncryptTableRuleConfiguration toBeChangedItemConfig) {
// TODO refactor DistSQL to only persist config
currentRuleConfig.getTables().removeIf(each -> each.getName().equals(toBeChangedItemConfig.getName()));
currentRuleConfig.getTables().add(toBeChangedItemConfig);
}

@Override
protected void dropRuleItemConfiguration(final DropRuleItemEvent event, final CompatibleEncryptRuleConfiguration currentRuleConfig) {
currentRuleConfig.getTables().removeIf(each -> each.getName().equals(((DropNamedRuleItemEvent) event).getItemName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,35 @@
package org.apache.shardingsphere.encrypt.subscriber.compatible;

import com.google.common.eventbus.Subscribe;
import lombok.Setter;
import org.apache.shardingsphere.encrypt.api.config.CompatibleEncryptRuleConfiguration;
import org.apache.shardingsphere.encrypt.api.config.rule.EncryptTableRuleConfiguration;
import org.apache.shardingsphere.encrypt.event.compatible.table.AlterCompatibleEncryptTableEvent;
import org.apache.shardingsphere.encrypt.event.compatible.table.DropCompatibleEncryptTableEvent;
import org.apache.shardingsphere.encrypt.rule.EncryptRule;
import org.apache.shardingsphere.encrypt.yaml.config.rule.YamlEncryptTableRuleConfiguration;
import org.apache.shardingsphere.encrypt.yaml.swapper.rule.YamlEncryptTableRuleConfigurationSwapper;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.config.DatabaseRuleConfigurationChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.subsciber.RuleChangedSubscriber;

import java.util.LinkedHashMap;
import java.util.LinkedList;

/**
* Compatible encrypt table subscriber.
* @deprecated compatible support will remove in next version.
*/
@Deprecated
@SuppressWarnings("UnstableApiUsage")
@Setter
public final class CompatibleEncryptTableSubscriber implements RuleChangedSubscriber<AlterCompatibleEncryptTableEvent, DropCompatibleEncryptTableEvent> {

private ContextManager contextManager;
private CompatibleEncryptTableSubscribeEngine engine;

@Override
public void setContextManager(final ContextManager contextManager) {
engine = new CompatibleEncryptTableSubscribeEngine(contextManager);
}

@Subscribe
@Override
public synchronized void renew(final AlterCompatibleEncryptTableEvent event) {
if (!event.getActiveVersion().equals(contextManager.getInstanceContext().getModeContextManager().getActiveVersionByKey(event.getActiveVersionKey()))) {
return;
}
String yamlContent = contextManager.getInstanceContext().getModeContextManager().getVersionPathByActiveVersionKey(event.getActiveVersionKey(), event.getActiveVersion());
EncryptTableRuleConfiguration toBeChangedConfig = new YamlEncryptTableRuleConfigurationSwapper().swapToObject(YamlEngine.unmarshal(yamlContent, YamlEncryptTableRuleConfiguration.class));
ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(event.getDatabaseName());
CompatibleEncryptRuleConfiguration config = database.getRuleMetaData().findSingleRule(EncryptRule.class)
.map(optional -> getCompatibleEncryptRuleConfiguration((CompatibleEncryptRuleConfiguration) optional.getConfiguration()))
.orElseGet(() -> new CompatibleEncryptRuleConfiguration(new LinkedList<>(), new LinkedHashMap<>()));
// TODO refactor DistSQL to only persist config
config.getTables().removeIf(each -> each.getName().equals(toBeChangedConfig.getName()));
config.getTables().add(toBeChangedConfig);
contextManager.getInstanceContext().getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
engine.renew(event);
}

@Subscribe
@Override
public synchronized void renew(final DropCompatibleEncryptTableEvent event) {
if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getDatabaseName())) {
return;
}
ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabases().get(event.getDatabaseName());
CompatibleEncryptRuleConfiguration config = (CompatibleEncryptRuleConfiguration) database.getRuleMetaData().getSingleRule(EncryptRule.class).getConfiguration();
config.getTables().removeIf(each -> each.getName().equals(event.getItemName()));
contextManager.getInstanceContext().getEventBusContext().post(new DatabaseRuleConfigurationChangedEvent(event.getDatabaseName(), config));
}

private CompatibleEncryptRuleConfiguration getCompatibleEncryptRuleConfiguration(final CompatibleEncryptRuleConfiguration config) {
return null == config.getTables() ? new CompatibleEncryptRuleConfiguration(new LinkedList<>(), config.getEncryptors()) : config;
engine.renew(event);
}
}
Loading

0 comments on commit e7284bf

Please sign in to comment.