Skip to content

Commit

Permalink
Add BroadcastImportRuleConfigurationProvider (#30460)
Browse files Browse the repository at this point in the history
  • Loading branch information
yx9o authored Mar 12, 2024
1 parent 319e3bb commit 9462a44
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.broadcast.distsql.handler.provider;

import org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.distsql.handler.engine.update.ral.rule.spi.database.ImportRuleConfigurationProvider;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;

import java.util.LinkedHashMap;
import java.util.Map.Entry;
import java.util.stream.Collectors;

/**
* Broadcast import rule configuration provider.
*/
public final class BroadcastImportRuleConfigurationProvider implements ImportRuleConfigurationProvider {

@Override
public void check(final ShardingSphereDatabase database, final RuleConfiguration ruleConfig) {
}

@Override
public DatabaseRule build(final ShardingSphereDatabase database, final RuleConfiguration ruleConfig, final InstanceContext instanceContext) {
return new BroadcastRule((BroadcastRuleConfiguration) ruleConfig, database.getName(),
database.getResourceMetaData().getStorageUnits().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules());
}

@Override
public Class<? extends RuleConfiguration> getType() {
return BroadcastRuleConfiguration.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.broadcast.distsql.handler.provider.BroadcastImportRuleConfigurationProvider
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.shardingsphere.proxy.backend.util;

import org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.distsql.handler.engine.update.ral.rule.spi.database.ImportRuleConfigurationProvider;
import org.apache.shardingsphere.distsql.handler.exception.datasource.MissingRequiredDataSourcesException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.InvalidStorageUnitsException;
Expand Down Expand Up @@ -141,9 +139,7 @@ private void addRules(final String databaseName, final Collection<YamlRuleConfig
}

private void addRule(final Collection<RuleConfiguration> ruleConfigs, final RuleConfiguration ruleConfig, final ShardingSphereDatabase database) {
if (ruleConfig instanceof BroadcastRuleConfiguration) {
addBroadcastRuleConfiguration((BroadcastRuleConfiguration) ruleConfig, ruleConfigs, database);
} else if (ruleConfig instanceof SingleRuleConfiguration) {
if (ruleConfig instanceof SingleRuleConfiguration) {
addSingleRuleConfiguration((SingleRuleConfiguration) ruleConfig, ruleConfigs, database);
} else {
ImportRuleConfigurationProvider provider = TypedSPILoader.getService(ImportRuleConfigurationProvider.class, ruleConfig.getClass());
Expand All @@ -164,14 +160,6 @@ private Map<Integer, RuleConfiguration> swapToRuleConfigs(final Collection<YamlR
return result;
}

private void addBroadcastRuleConfiguration(final BroadcastRuleConfiguration ruleConfig, final Collection<RuleConfiguration> ruleConfigs, final ShardingSphereDatabase database) {
ruleConfigs.add(ruleConfig);
database.getRuleMetaData().getRules().add(new BroadcastRule(ruleConfig, database.getName(),
database.getResourceMetaData().getStorageUnits().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules()));
}

private void addSingleRuleConfiguration(final SingleRuleConfiguration ruleConfig, final Collection<RuleConfiguration> ruleConfigs, final ShardingSphereDatabase database) {
ruleConfigs.add(ruleConfig);
database.getRuleMetaData().getRules().add(
Expand Down

0 comments on commit 9462a44

Please sign in to comment.