Skip to content

Commit

Permalink
Merge branch 'apache:master' into dev_26990
Browse files Browse the repository at this point in the history
  • Loading branch information
zihaoAK47 authored Oct 9, 2023
2 parents 674948c + 109cb1c commit 386c6d4
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,34 @@ public static EncryptRuleConfiguration convert(final Collection<EncryptRuleSegme
private static EncryptTableRuleConfiguration createEncryptTableRuleConfiguration(final EncryptRuleSegment ruleSegment) {
Collection<EncryptColumnRuleConfiguration> columns = new LinkedList<>();
for (EncryptColumnSegment each : ruleSegment.getColumns()) {
columns.add(createEncryptColumnRuleConfiguration(ruleSegment.getTableName(), each));
columns.add(createEncryptColumnRuleConfiguration(ruleSegment, each));
}
return new EncryptTableRuleConfiguration(ruleSegment.getTableName(), columns);
}

private static EncryptColumnRuleConfiguration createEncryptColumnRuleConfiguration(final String tableName, final EncryptColumnSegment columnSegment) {
private static EncryptColumnRuleConfiguration createEncryptColumnRuleConfiguration(final EncryptRuleSegment ruleSegment, final EncryptColumnSegment columnSegment) {
EncryptColumnItemRuleConfiguration cipherColumnConfig = new EncryptColumnItemRuleConfiguration(
columnSegment.getCipher().getName(), getEncryptorName(tableName, columnSegment.getName()));
columnSegment.getCipher().getName(), getEncryptorName(ruleSegment.getTableName(), columnSegment.getName()));
EncryptColumnRuleConfiguration result = new EncryptColumnRuleConfiguration(columnSegment.getName(), cipherColumnConfig);
if (null != columnSegment.getAssistedQuery()) {
setAssistedQuery(tableName, columnSegment, result);
setAssistedQuery(ruleSegment.getTableName(), columnSegment, result);
}
if (null != columnSegment.getLikeQuery()) {
setLikeQuery(tableName, columnSegment, result);
setLikeQuery(ruleSegment.getTableName(), columnSegment, result);
}
return result;
}

private static void setAssistedQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration result) {
String assistedQueryEncryptorName = null == columnSegment.getAssistedQuery().getEncryptor() ? null
: getAssistedQueryEncryptorName(tableName, columnSegment.getName());
private static void setAssistedQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration columnRuleConfig) {
String assistedQueryEncryptorName = null == columnSegment.getAssistedQuery().getEncryptor() ? null : getAssistedQueryEncryptorName(tableName, columnSegment.getName());
EncryptColumnItemRuleConfiguration assistedQueryColumnConfig = new EncryptColumnItemRuleConfiguration(columnSegment.getAssistedQuery().getName(), assistedQueryEncryptorName);
result.setAssistedQuery(assistedQueryColumnConfig);
columnRuleConfig.setAssistedQuery(assistedQueryColumnConfig);
}

private static void setLikeQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration result) {
private static void setLikeQuery(final String tableName, final EncryptColumnSegment columnSegment, final EncryptColumnRuleConfiguration columnRuleConfig) {
String likeQueryEncryptorName = null == columnSegment.getLikeQuery().getEncryptor() ? null : getLikeQueryEncryptorName(tableName, columnSegment.getName());
EncryptColumnItemRuleConfiguration likeQueryColumnConfig = new EncryptColumnItemRuleConfiguration(columnSegment.getLikeQuery().getName(), likeQueryEncryptorName);
result.setLikeQuery(likeQueryColumnConfig);
columnRuleConfig.setLikeQuery(likeQueryColumnConfig);
}

private static Map<String, AlgorithmConfiguration> createEncryptorConfigurations(final EncryptRuleSegment ruleSegment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
private boolean closed;

protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final ExecutionContext executionContext) {
return connection.getAutoCommit() && isNotInDistributedTransaction(connection) && isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
}

private boolean isNotInDistributedTransaction(final ShardingSphereConnection connection) {
if (connection.getAutoCommit()) {
return false;
}
ConnectionTransaction connectionTransaction = connection.getDatabaseConnectionManager().getConnectionTransaction();
boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().isInTransaction();
return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction;
if (!TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) || isInTransaction) {
return false;
}
return isModifiedSQL(executionContext) && executionContext.getExecutionUnits().size() > 1;
}

private boolean isModifiedSQL(final ExecutionContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
Expand All @@ -34,6 +33,7 @@
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,27 +90,14 @@ protected void prepare(final PipelineJobItemContext jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw ex;
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw new PipelineInternalException(ex);
}
}

protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;

protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
}

@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,33 @@ public void execute(final ShardingContext shardingContext) {
log.info("stopping true, ignore");
return;
}
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
getJobAPI().cleanJobItemErrorMessage(jobId, jobItemContext.getShardingItem());
getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
getJobAPI().stop(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void validateParameter(final CDCClientConfiguration parameter) {
@SneakyThrows(InterruptedException.class)
public void connect() {
Bootstrap bootstrap = new Bootstrap();
group = new NioEventLoopGroup();
group = new NioEventLoopGroup(1);
bootstrap.channel(NioSocketChannel.class)
.group(group)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -114,17 +114,15 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
for (PipelineJobItemContext each : jobItemContexts) {
processFailed(each, ex);
processFailed(each.getJobId(), each.getShardingItem(), ex);
}
throw ex;
}
}

@Override
protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
/**
* SQL federation rule configuration.
*/
@RequiredArgsConstructor
@Getter
@Setter
@RequiredArgsConstructor
public final class SQLFederationRuleConfiguration implements GlobalRuleConfiguration {

private final boolean sqlFederationEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public final class ShowSQLFederationRuleExecutor implements MetaDataRequiredQuer
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShardingSphereMetaData metaData, final ShowSQLFederationRuleStatement sqlStatement) {
SQLFederationRuleConfiguration ruleConfig = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class).getConfiguration();
return Collections.singleton(new LocalDataQueryResultRow(String.valueOf(ruleConfig.isSqlFederationEnabled()),
null != ruleConfig.getExecutionPlanCache() ? ruleConfig.getExecutionPlanCache().toString() : ""));
String sqlFederationEnabled = String.valueOf(ruleConfig.isSqlFederationEnabled());
String executionPlanCache = null != ruleConfig.getExecutionPlanCache() ? ruleConfig.getExecutionPlanCache().toString() : "";
LocalDataQueryResultRow row = new LocalDataQueryResultRow(sqlFederationEnabled, executionPlanCache);
return Collections.singleton(row);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import org.apache.shardingsphere.distsql.handler.query.RQLExecutor;
import org.apache.shardingsphere.distsql.parser.statement.rql.show.ShowStorageUnitsStatement;
import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datasource.pool.CatalogSwitchableDataSource;
import org.apache.shardingsphere.infra.datasource.pool.props.creator.DataSourcePoolPropertiesCreator;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
Expand Down Expand Up @@ -57,15 +55,15 @@ public Collection<String> getColumnNames() {

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShardingSphereDatabase database, final ShowStorageUnitsStatement sqlStatement) {
ResourceMetaData resourceMetaData = database.getResourceMetaData();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
for (Entry<String, DataSourcePoolProperties> entry : getDataSourcePoolPropertiesMap(database, sqlStatement).entrySet()) {
String key = entry.getKey();
ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(key).getConnectionProperties();
String storageUnitName = entry.getKey();
StorageUnit storageUnit = database.getResourceMetaData().getStorageUnits().get(storageUnitName);
ConnectionProperties connectionProps = storageUnit.getConnectionProperties();
Map<String, Object> poolProps = entry.getValue().getPoolPropertySynonyms().getStandardProperties();
Map<String, Object> customProps = getCustomProps(entry.getValue().getCustomProperties().getProperties(), connectionProps.getQueryProperties());
result.add(new LocalDataQueryResultRow(key,
resourceMetaData.getStorageUnits().get(key).getStorageType().getType(),
Map<String, Object> customProps = getCustomProperties(entry.getValue().getCustomProperties().getProperties(), connectionProps.getQueryProperties());
result.add(new LocalDataQueryResultRow(storageUnitName,
storageUnit.getStorageType().getType(),
connectionProps.getHostname(),
connectionProps.getPort(),
connectionProps.getCatalog(),
Expand All @@ -80,15 +78,6 @@ public Collection<LocalDataQueryResultRow> getRows(final ShardingSphereDatabase
return result;
}

private Map<String, Object> getCustomProps(final Map<String, Object> customProps, final Properties queryProps) {
Map<String, Object> result = new LinkedHashMap<>(customProps.size() + 1, 1F);
result.putAll(customProps);
if (!queryProps.isEmpty()) {
result.put("queryProperties", queryProps);
}
return result;
}

private Map<String, DataSourcePoolProperties> getDataSourcePoolPropertiesMap(final ShardingSphereDatabase database, final ShowStorageUnitsStatement sqlStatement) {
Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(database.getResourceMetaData().getStorageUnits().size(), 1F);
Map<String, DataSourcePoolProperties> propsMap = database.getResourceMetaData().getStorageUnits().entrySet().stream()
Expand All @@ -113,11 +102,10 @@ private Map<String, DataSourcePoolProperties> getDataSourcePoolPropertiesMap(fin

private DataSourcePoolProperties getDataSourcePoolProperties(final Map<String, DataSourcePoolProperties> propsMap, final String storageUnitName,
final DatabaseType databaseType, final DataSource dataSource) {
DataSourcePoolProperties result = getDataSourcePoolProperties(dataSource);
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData();
if (dialectDatabaseMetaData.isInstanceConnectionAvailable() && propsMap.containsKey(storageUnitName)) {
DataSourcePoolProperties unitDataSourcePoolProps = propsMap.get(storageUnitName);
for (Entry<String, Object> entry : unitDataSourcePoolProps.getPoolPropertySynonyms().getStandardProperties().entrySet()) {
DataSourcePoolProperties result = DataSourcePoolPropertiesCreator.create(
dataSource instanceof CatalogSwitchableDataSource ? ((CatalogSwitchableDataSource) dataSource).getDataSource() : dataSource);
if (new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().isInstanceConnectionAvailable() && propsMap.containsKey(storageUnitName)) {
for (Entry<String, Object> entry : propsMap.get(storageUnitName).getPoolPropertySynonyms().getStandardProperties().entrySet()) {
if (null != entry.getValue()) {
result.getPoolPropertySynonyms().getStandardProperties().put(entry.getKey(), entry.getValue());
}
Expand All @@ -126,10 +114,12 @@ private DataSourcePoolProperties getDataSourcePoolProperties(final Map<String, D
return result;
}

private DataSourcePoolProperties getDataSourcePoolProperties(final DataSource dataSource) {
return dataSource instanceof CatalogSwitchableDataSource
? DataSourcePoolPropertiesCreator.create(((CatalogSwitchableDataSource) dataSource).getDataSource())
: DataSourcePoolPropertiesCreator.create(dataSource);
private Map<String, Object> getCustomProperties(final Map<String, Object> customProps, final Properties queryProps) {
Map<String, Object> result = new LinkedHashMap<>(customProps);
if (!queryProps.isEmpty()) {
result.put("queryProperties", queryProps);
}
return result;
}

private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
Expand Down
Loading

0 comments on commit 386c6d4

Please sign in to comment.