Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Move ComputeNodeInstanceContext.LockContext to ContextManager" #34187

Merged
merged 1 commit into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private ContextManager mockContextManager() {
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(mock(MetaDataPersistService.class), new ShardingSphereMetaData());
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new ModeConfiguration("Standalone", null), new EventBusContext());
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator());
return new ContextManager(metaDataContexts, mock(LockContext.class), computeNodeInstanceContext, mock(PersistRepository.class));
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator(), mock(LockContext.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private ShardingRule createShardingRule() {
ruleConfig.getTables().add(nonCacheableTableSharding);
ruleConfig.setShardingCache(new ShardingCacheConfiguration(100, new ShardingCacheOptionsConfiguration(true, 0, 0)));
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(mock(ComputeNodeInstance.class), null, null);
instanceContext.init(props -> 0);
instanceContext.init(props -> 0, null);
return new ShardingRule(ruleConfig, Maps.of("ds_0", new MockedDataSource(), "ds_1", new MockedDataSource()), instanceContext, Collections.emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;

Expand All @@ -47,23 +48,29 @@ public final class ComputeNodeInstanceContext {
@Getter(AccessLevel.NONE)
private final AtomicReference<WorkerIdGenerator> workerIdGenerator;

@Getter(AccessLevel.NONE)
private final AtomicReference<LockContext> lockContext;

private final ClusterInstanceRegistry clusterInstanceRegistry;

public ComputeNodeInstanceContext(final ComputeNodeInstance instance, final ModeConfiguration modeConfiguration, final EventBusContext eventBusContext) {
this.instance = instance;
this.modeConfiguration = modeConfiguration;
this.eventBusContext = eventBusContext;
workerIdGenerator = new AtomicReference<>();
lockContext = new AtomicReference<>();
clusterInstanceRegistry = new ClusterInstanceRegistry();
}

/**
* Initialize compute node instance context.
*
* @param workerIdGenerator worker id generator
* @param lockContext lock context
*/
public void init(final WorkerIdGenerator workerIdGenerator) {
public void init(final WorkerIdGenerator workerIdGenerator, final LockContext lockContext) {
this.workerIdGenerator.set(workerIdGenerator);
this.lockContext.set(lockContext);
}

/**
Expand Down Expand Up @@ -135,4 +142,13 @@ public int generateWorkerId(final Properties props) {
instance.setWorkerId(result);
return result;
}

/**
* Get lock context.
*
* @return lock context
*/
public LockContext getLockContext() {
return Optional.ofNullable(lockContext.get()).orElseThrow(() -> new IllegalStateException("Lock context is not initialized."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.junit.jupiter.api.Test;
Expand All @@ -41,16 +42,18 @@ class ComputeNodeInstanceContextTest {
void assertInit() {
ComputeNodeInstanceContext context = new ComputeNodeInstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), mock(ModeConfiguration.class), new EventBusContext());
WorkerIdGenerator workerIdGenerator = mock(WorkerIdGenerator.class);
context.init(workerIdGenerator);
LockContext lockContext = mock(LockContext.class);
context.init(workerIdGenerator, lockContext);
context.generateWorkerId(new Properties());
verify(workerIdGenerator).generate(new Properties());
assertThat(context.getLockContext(), is(lockContext));
}

@Test
void assertUpdateStatusWithInvalidInstanceState() {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.updateStatus("id", "INVALID");
verify(instanceMetaData, times(0)).getId();
}
Expand All @@ -59,7 +62,7 @@ void assertUpdateStatusWithInvalidInstanceState() {
void assertUpdateStatusWithCurrentInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306);
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
instanceContext.updateStatus("foo_instance_id", InstanceState.CIRCUIT_BREAK.name());
assertThat(instanceContext.getInstance().getState().getCurrentState(), is(InstanceState.CIRCUIT_BREAK));
Expand All @@ -69,7 +72,7 @@ void assertUpdateStatusWithCurrentInstance() {
void assertUpdateStatusWithOtherInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306);
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
instanceContext.updateStatus("bar_instance_id", InstanceState.CIRCUIT_BREAK.name());
assertThat(instanceContext.getInstance().getState().getCurrentState(), is(InstanceState.OK));
Expand All @@ -79,7 +82,7 @@ void assertUpdateStatusWithOtherInstance() {
void assertUpdateLabelsWithCurrentInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306);
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.updateLabels("foo_instance_id", Arrays.asList("label_1", "label_2"));
assertThat(instanceContext.getInstance().getLabels(), is(Arrays.asList("label_1", "label_2")));
}
Expand All @@ -88,7 +91,7 @@ void assertUpdateLabelsWithCurrentInstance() {
void assertUpdateLabelsWithOtherInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306);
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
instanceContext.updateLabels("bar_instance_id", Arrays.asList("label_1", "label_2"));
assertTrue(instanceContext.getInstance().getLabels().isEmpty());
Expand All @@ -99,7 +102,7 @@ void assertUpdateLabelsWithOtherInstance() {
void assertUpdateWorkerIdWithCurrentInstance() {
ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306));
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.updateWorkerId("foo_instance_id", 10);
assertThat(instanceContext.getWorkerId(), is(10));
}
Expand All @@ -108,7 +111,7 @@ void assertUpdateWorkerIdWithCurrentInstance() {
void assertUpdateWorkerIdWithOtherInstance() {
ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306));
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
instanceContext.updateWorkerId("bar_instance_id", 10);
assertThat(instanceContext.getWorkerId(), is(-1));
Expand All @@ -118,7 +121,7 @@ void assertUpdateWorkerIdWithOtherInstance() {
@Test
void assertGenerateWorkerId() {
ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), mock(ModeConfiguration.class), new EventBusContext());
instanceContext.init(mock(WorkerIdGenerator.class));
instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class));
assertThat(instanceContext.generateWorkerId(new Properties()), is(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
Expand Down Expand Up @@ -112,12 +113,13 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
String jobId = jobConfig.getJobId();
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
LockContext lockContext = contextManager.getComputeNodeInstanceContext().getLockContext();
if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
jobItemManager.persistProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
if (contextManager.getLockContext().tryLock(lockDefinition, 600 * 1000L)) {
if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
try {
PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
Expand All @@ -130,7 +132,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
}
} finally {
log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
contextManager.getLockContext().unlock(lockDefinition);
lockContext.unlock(lockDefinition);
}
} else {
log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, jobItemContext.getShardingItem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
Expand Down Expand Up @@ -55,8 +54,6 @@ public final class ContextManager implements AutoCloseable {

private final AtomicReference<MetaDataContexts> metaDataContexts;

private final LockContext lockContext;

private final ComputeNodeInstanceContext computeNodeInstanceContext;

private final ExecutorEngine executorEngine;
Expand All @@ -67,9 +64,8 @@ public final class ContextManager implements AutoCloseable {

private final MetaDataContextManager metaDataContextManager;

public ContextManager(final MetaDataContexts metaDataContexts, final LockContext lockContext, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.lockContext = lockContext;
this.computeNodeInstanceContext = computeNodeInstanceContext;
metaDataContextManager = new MetaDataContextManager(this.metaDataContexts, computeNodeInstanceContext, repository);
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), metaDataContextManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
Expand Down Expand Up @@ -90,7 +89,7 @@ void setUp() throws SQLException {
when(metaDataContexts.getMetaData().getAllDatabases()).thenReturn(Collections.singleton(database));
when(computeNodeInstanceContext.getInstance()).thenReturn(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_id", 3307), Collections.emptyList()));
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
contextManager = new ContextManager(metaDataContexts, mock(LockContext.class), computeNodeInstanceContext, mock(PersistRepository.class));
contextManager = new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}

private ShardingSphereDatabase mockDatabase() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
Expand Down Expand Up @@ -58,10 +59,11 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
ClusterPersistRepositoryConfiguration config = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData(), param.getLabels()), modeConfig, eventBusContext);
ClusterPersistRepository repository = getClusterPersistRepository(config, computeNodeInstanceContext);
computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()));
LockContext lockContext = new ClusterLockContext(new GlobalLockPersistService(repository));
computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()), lockContext);
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext);
ContextManager result = new ContextManager(metaDataContexts, new ClusterLockContext(new GlobalLockPersistService(repository)), computeNodeInstanceContext, repository);
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
registerOnline(computeNodeInstanceContext, param, result, repository);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
StandalonePersistRepository.class, null == repositoryConfig ? null : repositoryConfig.getType(), null == repositoryConfig ? new Properties() : repositoryConfig.getProps());
MetaDataPersistService persistService = new MetaDataPersistService(repository);
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), param.getModeConfiguration(), eventBusContext);
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator());
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator(), new StandaloneLockContext());
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, new StandaloneLockContext(), computeNodeInstanceContext, repository);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
}

@Override
Expand Down
Loading
Loading