Skip to content

Commit

Permalink
Refactor PipelineContextKey
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 9, 2023
1 parent 818b49b commit c9df3fb
Show file tree
Hide file tree
Showing 19 changed files with 38 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -27,42 +26,16 @@
/**
* Pipeline context key.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@RequiredArgsConstructor
@Getter
public final class PipelineContextKey {

private final String databaseName;

private final InstanceType instanceType;

/**
* Build context key.
*
* @param databaseName database name
* @param instanceType instance type
* @return context key
*/
public static PipelineContextKey build(final String databaseName, final InstanceType instanceType) {
return new PipelineContextKey(databaseName, instanceType);
}

/**
* Build context key for proxy.
*
* @return context key
*/
public static PipelineContextKey buildForProxy() {
return new PipelineContextKey("", InstanceType.PROXY);
}

/**
* Build context key for proxy.
*
* @param databaseName database name
* @return context key
*/
public static PipelineContextKey buildForProxy(final String databaseName) {
return new PipelineContextKey(databaseName, InstanceType.PROXY);
public PipelineContextKey(final InstanceType instanceType) {
this("", instanceType);
}

@Override
Expand All @@ -78,7 +51,7 @@ public boolean equals(final Object o) {
}

private String filterDatabaseName(final PipelineContextKey contextKey) {
return contextKey.getInstanceType() == InstanceType.PROXY ? "" : contextKey.getDatabaseName();
return InstanceType.PROXY == contextKey.getInstanceType() ? "" : contextKey.getDatabaseName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -47,7 +48,7 @@ public static PipelineContext getContext(final PipelineContextKey key) {
* @return context
*/
public static PipelineContext getProxyContext() {
return CONTEXT_MAP.get(PipelineContextKey.buildForProxy());
return CONTEXT_MAP.get(new PipelineContextKey(InstanceType.PROXY));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ public void onInitialized(final String databaseName, final ContextManager contex
if (DefaultDatabase.LOGIC_NAME.equals(databaseName)) {
return;
}
PipelineContextKey contextKey = PipelineContextKey.build(databaseName, contextManager.getInstanceContext().getInstance().getMetaData().getType());
PipelineContextKey contextKey = new PipelineContextKey(databaseName, contextManager.getInstanceContext().getInstance().getMetaData().getType());
PipelineContextManager.putContext(contextKey, new PipelineContext(modeConfig, contextManager));
PipelineMetaDataNodeWatcher.getInstance(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
}

@Override
public void onDestroyed(final String databaseName, final InstanceType instanceType) {
PipelineContextManager.removeContext(PipelineContextKey.build(databaseName, instanceType));
PipelineContextManager.removeContext(new PipelineContextKey(databaseName, instanceType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public static PipelineContextKey parseContextKey(final String jobId) {
char instanceType = jobId.charAt(5);
short databaseNameLength = Shorts.fromByteArray(Hex.decodeHex(jobId.substring(6, 10)));
String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10 + databaseNameLength)), StandardCharsets.UTF_8);
return PipelineContextKey.build(databaseName, InstanceTypeUtils.decode(instanceType));
return new PipelineContextKey(databaseName, InstanceTypeUtils.decode(instanceType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ class PipelineContextKeyTest {

@Test
void assertHashCodeEqualsForProxyMode() {
PipelineContextKey contextKey1 = PipelineContextKey.build(null, InstanceType.PROXY);
PipelineContextKey contextKey2 = PipelineContextKey.build("sharding_db", InstanceType.PROXY);
PipelineContextKey contextKey1 = new PipelineContextKey(null, InstanceType.PROXY);
PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db", InstanceType.PROXY);
assertThat(contextKey1.hashCode(), is(contextKey2.hashCode()));
assertThat(contextKey1, is(contextKey2));
}

@Test
void assertHashCodeEqualsForJdbcMode() {
PipelineContextKey contextKey1 = PipelineContextKey.build("logic_db", InstanceType.JDBC);
PipelineContextKey contextKey2 = PipelineContextKey.build("sharding_db", InstanceType.JDBC);
PipelineContextKey contextKey1 = new PipelineContextKey("logic_db", InstanceType.JDBC);
PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db", InstanceType.JDBC);
assertThat(contextKey1.hashCode(), not(contextKey2.hashCode()));
assertThat(contextKey1, not(contextKey2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;

import java.util.Arrays;
Expand All @@ -38,7 +39,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingListStatement sqlStatement) {
return jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;

Expand All @@ -37,7 +38,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationListStatement sqlStatement) {
return jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;

Expand All @@ -38,7 +39,7 @@ public final class ShowMigrationSourceStorageUnitsExecutor implements QueryableR

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationSourceStorageUnitsStatement sqlStatement) {
Iterator<Collection<Object>> data = jobAPI.listMigrationSourceResources(PipelineContextKey.buildForProxy()).iterator();
Iterator<Collection<Object>> data = jobAPI.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>) data.next()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;

/**
Expand All @@ -37,7 +38,7 @@ public final class MigrateTableUpdater implements RALUpdater<MigrateTableStateme
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
jobAPI.createJobAndStart(PipelineContextKey.buildForProxy(), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
jobAPI.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;

import java.util.ArrayList;
Expand All @@ -54,7 +55,7 @@ public void executeUpdate(final String databaseName, final RegisterMigrationSour
DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
jobAPI.addMigrationSourceResources(PipelineContextKey.buildForProxy(), propsMap);
jobAPI.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;

/**
Expand All @@ -31,7 +32,7 @@ public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpd

@Override
public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
jobAPI.dropMigrationSourceResources(PipelineContextKey.buildForProxy(), sqlStatement.getNames());
jobAPI.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
Expand Down Expand Up @@ -116,7 +117,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
* @return job id
*/
public String createJob(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
PipelineContextKey contextKey = PipelineContextKey.buildForProxy(param.getDatabaseName());
PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CDCJobIdTest {

@Test
void assertParseJobType() {
PipelineContextKey contextKey = PipelineContextKey.build("sharding_db", InstanceType.PROXY);
PipelineContextKey contextKey = new PipelineContextKey("sharding_db", InstanceType.PROXY);
CDCJobId pipelineJobId = new CDCJobId(contextKey, Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void assertParse() {
}

private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = PipelineContextKey.build("sharding_db", instanceType);
PipelineContextKey contextKey = new PipelineContextKey("sharding_db", instanceType);
MigrationJobId pipelineJobId = new MigrationJobId(contextKey, Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"));
String jobId = new MigrationJobAPI().marshalJobId(pipelineJobId);
assertThat(PipelineJobIdUtils.parseJobType(jobId), instanceOf(MigrationJobType.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
Expand All @@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"))
.showProcessConfiguration(PipelineContextKey.buildForProxy());
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.converter.InventoryIncrementalProcessConfigurationSegmentConverter;

Expand All @@ -35,7 +36,7 @@ public final class AlterInventoryIncrementalRuleUpdater implements RALUpdater<Al
public void executeUpdate(final String databaseName, final AlterInventoryIncrementalRuleStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig = InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobAPI.alterProcessConfiguration(PipelineContextKey.buildForProxy(), processConfig);
jobAPI.alterProcessConfiguration(new PipelineContextKey(InstanceType.PROXY), processConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
result.setTargetTableSchemaMap(targetTableSchemaMap);
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
PipelineContextKey contextKey = PipelineContextKey.build(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
PipelineContextKey contextKey = new PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
result.setJobId(generateMigrationJobId(contextKey, result));
Map<String, YamlPipelineDataSourceConfiguration> sources = new LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -65,7 +66,7 @@
*/
public final class PipelineContextUtils {

private static final PipelineContextKey CONTEXT_KEY = PipelineContextKey.buildForProxy();
private static final PipelineContextKey CONTEXT_KEY = new PipelineContextKey(InstanceType.PROXY);

private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());

Expand Down
Loading

0 comments on commit c9df3fb

Please sign in to comment.