Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into dev1008-sonar
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyingZC committed Oct 9, 2023
2 parents 0de8d0c + 09d2365 commit bdf7b3e
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ public final class EncryptTable {

private final Map<String, EncryptColumn> columns;

@SuppressWarnings("rawtypes")
public EncryptTable(final EncryptTableRuleConfiguration config, final Map<String, StandardEncryptAlgorithm> standardEncryptors,
final Map<String, AssistedEncryptAlgorithm> assistedEncryptors, final Map<String, LikeEncryptAlgorithm> likeEncryptors) {
table = config.getName();
columns = createEncryptColumns(config, standardEncryptors, assistedEncryptors, likeEncryptors);
}

@SuppressWarnings("rawtypes")
private Map<String, EncryptColumn> createEncryptColumns(final EncryptTableRuleConfiguration config, final Map<String, StandardEncryptAlgorithm> standardEncryptors,
final Map<String, AssistedEncryptAlgorithm> assistedEncryptors, final Map<String, LikeEncryptAlgorithm> likeEncryptors) {
Map<String, EncryptColumn> result = new CaseInsensitiveMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,34 @@ public static EncryptRuleConfiguration convert(final Collection<EncryptRuleSegme
private static EncryptTableRuleConfiguration createEncryptTableRuleConfiguration(final EncryptRuleSegment ruleSegment) {
Collection<EncryptColumnRuleConfiguration> columns = new LinkedList<>();
for (EncryptColumnSegment each : ruleSegment.getColumns()) {
columns.add(createEncryptColumnRuleConfiguration(ruleSegment.getTableName(), each));
columns.add(createEncryptColumnRuleConfiguration(ruleSegment, each));
}
return new EncryptTableRuleConfiguration(ruleSegment.getTableName(), columns);
}

private static EncryptColumnRuleConfiguration createEncryptColumnRuleConfiguration(final String tableName, final EncryptColumnSegment columnSegment) {
private static EncryptColumnRuleConfiguration createEncryptColumnRuleConfiguration(final EncryptRuleSegment ruleSegment, final EncryptColumnSegment columnSegment) {
EncryptColumnItemRuleConfiguration cipherColumnConfig = new EncryptColumnItemRuleConfiguration(
columnSegment.getCipher().getName(), getEncryptorName(tableName, columnSegment.getName()));
columnSegment.getCipher().getName(), getEncryptorName(ruleSegment.getTableName(), columnSegment.getName()));
EncryptColumnRuleConfiguration result = new EncryptColumnRuleConfiguration(columnSegment.getName(), cipherColumnConfig);
if (null != columnSegment.getAssistedQuery()) {
setAssistedQuery(tableName, columnSegment, result);
setAssistedQuery(ruleSegment.getTableName(), columnSegment, result);
}
if (null != columnSegment.getLikeQuery()) {
setLikeQuery(tableName, columnSegment, result);
setLikeQuery(ruleSegment.getTableName(), columnSegment, result);
}
return result;
}

private static void setAssistedQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration result) {
String assistedQueryEncryptorName = null == columnSegment.getAssistedQuery().getEncryptor() ? null
: getAssistedQueryEncryptorName(tableName, columnSegment.getName());
private static void setAssistedQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration columnRuleConfig) {
String assistedQueryEncryptorName = null == columnSegment.getAssistedQuery().getEncryptor() ? null : getAssistedQueryEncryptorName(tableName, columnSegment.getName());
EncryptColumnItemRuleConfiguration assistedQueryColumnConfig = new EncryptColumnItemRuleConfiguration(columnSegment.getAssistedQuery().getName(), assistedQueryEncryptorName);
result.setAssistedQuery(assistedQueryColumnConfig);
columnRuleConfig.setAssistedQuery(assistedQueryColumnConfig);
}

private static void setLikeQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration result) {
private static void setLikeQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration columnRuleConfig) {
String likeQueryEncryptorName = null == columnSegment.getLikeQuery().getEncryptor() ? null : getLikeQueryEncryptorName(tableName, columnSegment.getName());
EncryptColumnItemRuleConfiguration likeQueryColumnConfig = new EncryptColumnItemRuleConfiguration(columnSegment.getLikeQuery().getName(), likeQueryEncryptorName);
result.setLikeQuery(likeQueryColumnConfig);
columnRuleConfig.setLikeQuery(likeQueryColumnConfig);
}

private static Map<String, AlgorithmConfiguration> createEncryptorConfigurations(final EncryptRuleSegment ruleSegment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public final class TablesContext {

private final Collection<SimpleTableSegment> simpleTableSegments = new LinkedList<>();

/**
* Get table names.
*
* @return table names
*/
@Getter
private final Collection<String> tableNames = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@
import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,4 +96,40 @@ public <T extends ShardingSphereRule> T getSingleRule(final Class<T> clazz) {
Preconditions.checkState(1 == foundRules.size(), "Rule `%s` should have and only have one instance.", clazz.getSimpleName());
return foundRules.iterator().next();
}

/**
* Get in used storage units name and used rule classes map.
*
* @return in used storage units name and used rule classes map
*/
public Map<String, Collection<Class<? extends ShardingSphereRule>>> getInUsedStorageUnitNameAndRulesMap() {
Map<String, Collection<Class<? extends ShardingSphereRule>>> result = new LinkedHashMap<>();
for (ShardingSphereRule each : rules) {
if (each instanceof DataSourceContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataSourceContainedRule) each)));
} else if (each instanceof DataNodeContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataNodeContainedRule) each)));
}
}
return result;
}

private Map<String, Collection<Class<? extends ShardingSphereRule>>> getInUsedStorageUnitNameAndRulesMap(final ShardingSphereRule rule, final Collection<String> inUsedStorageUnitNames) {
Map<String, Collection<Class<? extends ShardingSphereRule>>> result = new LinkedHashMap<>();
for (String each : inUsedStorageUnitNames) {
if (!result.containsKey(each)) {
result.put(each, new LinkedHashSet<>());
}
result.get(each).add(rule.getClass());
}
return result;
}

private Collection<String> getInUsedStorageUnitNames(final DataSourceContainedRule rule) {
return rule.getDataSourceMapper().values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
}

private Collection<String> getInUsedStorageUnitNames(final DataNodeContainedRule rule) {
return rule.getAllDataNodes().values().stream().flatMap(each -> each.stream().map(DataNode::getDataSourceName).collect(Collectors.toSet()).stream()).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.distsql.handler.exception.storageunit;

import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;

import java.util.Collection;

Expand All @@ -28,7 +29,7 @@ public final class StorageUnitInUsedException extends StorageUnitDefinitionViola

private static final long serialVersionUID = -3427324685070457375L;

public StorageUnitInUsedException(final String storageUnitName, final Collection<String> ruleTypes) {
super(XOpenSQLState.CHECK_OPTION_VIOLATION, 3, "Storage unit `%s` is still used by `%s`.", storageUnitName, ruleTypes);
public StorageUnitInUsedException(final String storageUnitName, final Collection<Class<? extends ShardingSphereRule>> ruleClasses) {
super(XOpenSQLState.CHECK_OPTION_VIOLATION, 3, "Storage unit `%s` is still used by `%s`.", storageUnitName, ruleClasses);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
Expand All @@ -34,6 +33,7 @@
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,27 +90,14 @@ protected void prepare(final PipelineJobItemContext jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw ex;
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw new PipelineInternalException(ex);
}
}

protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;

protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
}

@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,33 @@ public void execute(final ShardingContext shardingContext) {
log.info("stopping true, ignore");
return;
}
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
getJobAPI().cleanJobItemErrorMessage(jobId, jobItemContext.getShardingItem());
getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
getJobAPI().stop(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void validateParameter(final CDCClientConfiguration parameter) {
@SneakyThrows(InterruptedException.class)
public void connect() {
Bootstrap bootstrap = new Bootstrap();
group = new NioEventLoopGroup();
group = new NioEventLoopGroup(1);
bootstrap.channel(NioSocketChannel.class)
.group(group)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -114,17 +114,15 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
for (PipelineJobItemContext each : jobItemContexts) {
processFailed(each, ex);
processFailed(each.getJobId(), each.getShardingItem(), ex);
}
throw ex;
}
}

@Override
protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
/**
* SQL federation rule configuration.
*/
@RequiredArgsConstructor
@Getter
@Setter
@RequiredArgsConstructor
public final class SQLFederationRuleConfiguration implements GlobalRuleConfiguration {

private final boolean sqlFederationEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ public Optional<SqlNode> convert(final JoinTableSegment segment) {
SqlNode right = new TableConverter().convert(segment.getRight()).orElseThrow(IllegalStateException::new);
Optional<SqlNode> condition = convertJoinCondition(segment);
SqlLiteral conditionType = convertConditionType(segment);
SqlLiteral joinType = JoinType.valueOf(segment.getJoinType()).symbol(SqlParserPos.ZERO);
SqlLiteral joinType = convertJoinType(segment);
return Optional.of(new SqlJoin(SqlParserPos.ZERO, left, SqlLiteral.createBoolean(segment.isNatural(), SqlParserPos.ZERO), joinType, right, conditionType, condition.orElse(null)));
}

private static SqlLiteral convertJoinType(final JoinTableSegment segment) {
if (JoinType.INNER.name().equals(segment.getJoinType()) && !segment.isNatural() && null == segment.getCondition() && segment.getUsing().isEmpty()) {
return JoinType.COMMA.symbol(SqlParserPos.ZERO);
}
return JoinType.valueOf(segment.getJoinType()).symbol(SqlParserPos.ZERO);
}

private SqlLiteral convertConditionType(final JoinTableSegment segment) {
if (!segment.getUsing().isEmpty()) {
return JoinConditionType.USING.symbol(SqlParserPos.ZERO);
Expand Down
Loading

0 comments on commit bdf7b3e

Please sign in to comment.