Skip to content

Commit

Permalink
Refactor DatabaseAwareRALUpdater (#29767)
Browse files Browse the repository at this point in the history
* Rename RALUpdater

* Refactor DatabaseAwareRALUpdater

* Refactor DatabaseAwareRALUpdater

* Refactor DatabaseAwareRALUpdater

* Refactor DatabaseAwareRALUpdater

* Refactor DatabaseAwareRALUpdater
  • Loading branch information
terrymanu authored Jan 18, 2024
1 parent 1d556e4 commit 6b096d0
Show file tree
Hide file tree
Showing 42 changed files with 187 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @param <T> type of SQL statement
*/
public interface DatabaseAwareRALUpdater<T extends UpdatableRALStatement> extends DatabaseRuleRALUpdater<T> {
public interface DatabaseAwareRALUpdater<T extends UpdatableRALStatement> extends RALUpdater<T> {

/**
* Set database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
import java.sql.SQLException;

/**
* Database rule RAL updater.
* RAL updater.
*
* @param <T> type of updatable RAL statement
*/
@SingletonSPI
public interface DatabaseRuleRALUpdater<T extends UpdatableRALStatement> extends TypedSPI {
public interface RALUpdater<T extends UpdatableRALStatement> extends TypedSPI {

/**
* Execute update.
*
* @param databaseName database name
* @param sqlStatement updatable RAL statement
* @throws SQLException SQL exception
*/
void executeUpdate(String databaseName, T sqlStatement) throws SQLException;
void executeUpdate(T sqlStatement) throws SQLException;

@Override
Class<T> getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.DropStreamingStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.sql.SQLException;

/**
* Drop streaming updater.
*/
public final class DropStreamingUpdater implements DatabaseRuleRALUpdater<DropStreamingStatement> {
public final class DropStreamingUpdater implements RALUpdater<DropStreamingStatement> {

private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");

@Override
public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException {
public void executeUpdate(final DropStreamingStatement sqlStatement) throws SQLException {
jobAPI.drop(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.CheckMigrationStatement;
Expand All @@ -38,14 +38,14 @@
/**
* Check migration job updater.
*/
public final class CheckMigrationJobUpdater implements DatabaseRuleRALUpdater<CheckMigrationStatement> {
public final class CheckMigrationJobUpdater implements RALUpdater<CheckMigrationStatement> {

private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType());

private final PipelineJobType migrationJobType = new MigrationJobType();

@Override
public void executeUpdate(final String databaseName, final CheckMigrationStatement sqlStatement) throws SQLException {
public void executeUpdate(final CheckMigrationStatement sqlStatement) throws SQLException {
AlgorithmSegment typeStrategy = sqlStatement.getTypeStrategy();
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.CommitMigrationStatement;

Expand All @@ -27,10 +27,10 @@
/**
* Commit migration updater.
*/
public final class CommitMigrationUpdater implements DatabaseRuleRALUpdater<CommitMigrationStatement> {
public final class CommitMigrationUpdater implements RALUpdater<CommitMigrationStatement> {

@Override
public void executeUpdate(final String databaseName, final CommitMigrationStatement sqlStatement) throws SQLException {
public void executeUpdate(final CommitMigrationStatement sqlStatement) throws SQLException {
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION").commit(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.DropMigrationCheckStatement;

/**
* Drop migration check updater.
*/
public final class DropMigrationCheckUpdater implements DatabaseRuleRALUpdater<DropMigrationCheckStatement> {
public final class DropMigrationCheckUpdater implements RALUpdater<DropMigrationCheckStatement> {

private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType());

@Override
public void executeUpdate(final String databaseName, final DropMigrationCheckStatement sqlStatement) {
public void executeUpdate(final DropMigrationCheckStatement sqlStatement) {
jobAPI.drop(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@

package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;

import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseAwareRALUpdater;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;

/**
* Migrate table updater.
*/
public final class MigrateTableUpdater implements DatabaseRuleRALUpdater<MigrateTableStatement> {
@Setter
public final class MigrateTableUpdater implements DatabaseAwareRALUpdater<MigrateTableStatement> {

private ShardingSphereDatabase database;

@Override
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
public void executeUpdate(final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
jobAPI.start(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
Expand All @@ -42,14 +42,14 @@
/**
* Register migration source storage unit updater.
*/
public final class RegisterMigrationSourceStorageUnitUpdater implements DatabaseRuleRALUpdater<RegisterMigrationSourceStorageUnitStatement> {
public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdater<RegisterMigrationSourceStorageUnitStatement> {

private final MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");

private final DataSourcePoolPropertiesValidateHandler validateHandler = new DataSourcePoolPropertiesValidateHandler();

@Override
public void executeUpdate(final String databaseName, final RegisterMigrationSourceStorageUnitStatement sqlStatement) {
public void executeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlStatement) {
List<DataSourceSegment> dataSources = new ArrayList<>(sqlStatement.getDataSources());
ShardingSpherePreconditions.checkState(dataSources.stream().noneMatch(HostnameAndPortBasedDataSourceSegment.class::isInstance),
() -> new UnsupportedSQLOperationException("Not currently support add hostname and port, please use url"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.RollbackMigrationStatement;

Expand All @@ -27,10 +27,10 @@
/**
* Rollback migration updater.
*/
public final class RollbackMigrationUpdater implements DatabaseRuleRALUpdater<RollbackMigrationStatement> {
public final class RollbackMigrationUpdater implements RALUpdater<RollbackMigrationStatement> {

@Override
public void executeUpdate(final String databaseName, final RollbackMigrationStatement sqlStatement) throws SQLException {
public void executeUpdate(final RollbackMigrationStatement sqlStatement) throws SQLException {
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION").rollback(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.StartMigrationCheckStatement;

/**
* Start migration check updater.
*/
public final class StartMigrationCheckUpdater implements DatabaseRuleRALUpdater<StartMigrationCheckStatement> {
public final class StartMigrationCheckUpdater implements RALUpdater<StartMigrationCheckStatement> {

private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType());

@Override
public void executeUpdate(final String databaseName, final StartMigrationCheckStatement sqlStatement) {
public void executeUpdate(final StartMigrationCheckStatement sqlStatement) {
jobAPI.resume(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.StartMigrationStatement;

/**
* Start migration updater.
*/
public final class StartMigrationUpdater implements DatabaseRuleRALUpdater<StartMigrationStatement> {
public final class StartMigrationUpdater implements RALUpdater<StartMigrationStatement> {

private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobType());

@Override
public void executeUpdate(final String databaseName, final StartMigrationStatement sqlStatement) {
public void executeUpdate(final StartMigrationStatement sqlStatement) {
jobManager.resume(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.StopMigrationCheckStatement;

/**
* Stop migration check updater.
*/
public final class StopMigrationCheckUpdater implements DatabaseRuleRALUpdater<StopMigrationCheckStatement> {
public final class StopMigrationCheckUpdater implements RALUpdater<StopMigrationCheckStatement> {

private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType());

@Override
public void executeUpdate(final String databaseName, final StopMigrationCheckStatement sqlStatement) {
public void executeUpdate(final StopMigrationCheckStatement sqlStatement) {
jobAPI.stop(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.StopMigrationStatement;

/**
* Stop migration updater.
*/
public final class StopMigrationUpdater implements DatabaseRuleRALUpdater<StopMigrationStatement> {
public final class StopMigrationUpdater implements RALUpdater<StopMigrationStatement> {

private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobType());

@Override
public void executeUpdate(final String databaseName, final StopMigrationStatement sqlStatement) {
public void executeUpdate(final StopMigrationStatement sqlStatement) {
jobManager.stop(sqlStatement.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;

/**
* Unregister migration source storage unit updater.
*/
public final class UnregisterMigrationSourceStorageUnitUpdater implements DatabaseRuleRALUpdater<UnregisterMigrationSourceStorageUnitStatement> {
public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {

private final MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");

@Override
public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
public void executeUpdate(final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
jobAPI.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public static ProxyBackendHandler newInstance(final RALStatement sqlStatement, f
}
return new UpdatableGlobalRuleRALBackendHandler((UpdatableGlobalRuleRALStatement) sqlStatement);
}
return new UpdatableDatabaseRuleRALBackendHandler<>((UpdatableRALStatement) sqlStatement, connectionSession);
return new UpdatableRALBackendHandler<>((UpdatableRALStatement) sqlStatement, connectionSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseRuleRALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.handler.type.ral.update.DatabaseAwareRALUpdater;
import org.apache.shardingsphere.distsql.statement.ral.UpdatableRALStatement;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand All @@ -36,7 +36,7 @@
* @param <T> type of SQL statement
*/
@RequiredArgsConstructor
public final class UpdatableDatabaseRuleRALBackendHandler<T extends UpdatableRALStatement> implements RALBackendHandler {
public final class UpdatableRALBackendHandler<T extends UpdatableRALStatement> implements RALBackendHandler {

private final UpdatableRALStatement sqlStatement;

Expand All @@ -45,11 +45,11 @@ public final class UpdatableDatabaseRuleRALBackendHandler<T extends UpdatableRAL
@SuppressWarnings("unchecked")
@Override
public ResponseHeader execute() throws SQLException {
DatabaseRuleRALUpdater<T> updater = TypedSPILoader.getService(DatabaseRuleRALUpdater.class, sqlStatement.getClass());
RALUpdater<T> updater = TypedSPILoader.getService(RALUpdater.class, sqlStatement.getClass());
if (updater instanceof DatabaseAwareRALUpdater) {
((DatabaseAwareRALUpdater<T>) updater).setDatabase(ProxyContext.getInstance().getDatabase(DatabaseNameUtils.getDatabaseName(sqlStatement, connectionSession)));
}
updater.executeUpdate(connectionSession.getDatabaseName(), (T) sqlStatement);
updater.executeUpdate((T) sqlStatement);
return new UpdateResponseHeader(sqlStatement);
}
}
Loading

0 comments on commit 6b096d0

Please sign in to comment.