Skip to content

Commit

Permalink
Refactor ClusterWorkerIdGenerator (#31318)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored May 20, 2024
1 parent d40ea0a commit 8c9a165
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private ClusterPersistRepository getClusterPersistRepository(final ClusterPersis

private ComputeNodeInstanceContext buildComputeNodeInstanceContext(final ModeConfiguration modeConfig,
final InstanceMetaData instanceMetaData, final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
return new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), new ClusterWorkerIdGenerator(repository, instanceMetaData), modeConfig,
return new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), new ClusterWorkerIdGenerator(repository, instanceMetaData.getId()), modeConfig,
new ClusterModeContextManager(), new GlobalLockContext(new GlobalLockPersistService(initDistributedLockHolder(repository))), eventBusContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
Expand All @@ -43,15 +42,15 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {

private final ClusterPersistRepository repository;

private final InstanceMetaData instanceMetaData;
private final String instanceId;

private final ComputeNodeStatusService computeNodeStatusService;

private final AtomicBoolean isWarned = new AtomicBoolean(false);

public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, final InstanceMetaData instanceMetaData) {
public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, final String instanceId) {
this.repository = repository;
this.instanceMetaData = instanceMetaData;
this.instanceId = instanceId;
computeNodeStatusService = new ComputeNodeStatusService(repository);
}

Expand All @@ -63,7 +62,7 @@ public int generate(final Properties props) {
}

private Optional<Integer> loadExistedWorkerId() {
return computeNodeStatusService.loadInstanceWorkerId(instanceMetaData.getId());
return computeNodeStatusService.loadInstanceWorkerId(instanceId);
}

private int generateNewWorkerId() {
Expand All @@ -72,7 +71,7 @@ private int generateNewWorkerId() {
generatedWorkId = generateAvailableWorkerId();
} while (!generatedWorkId.isPresent());
int result = generatedWorkId.get();
computeNodeStatusService.persistInstanceWorkerId(instanceMetaData.getId(), result);
computeNodeStatusService.persistInstanceWorkerId(instanceId, result);
return result;
}

Expand All @@ -90,7 +89,7 @@ private Optional<Integer> generateAvailableWorkerId() {
Integer preselectedWorkerId = priorityQueue.poll();
Preconditions.checkState(null != preselectedWorkerId, "Preselected worker-id can not be null.");
try {
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceMetaData.getId());
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceId);
return Optional.of(preselectedWorkerId);
} catch (final ClusterPersistRepositoryException ignore) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;

import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
Expand All @@ -37,19 +36,15 @@ class ClusterWorkerIdGeneratorTest {

@Test
void assertGenerateWithExistedWorkerId() {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
when(instanceMetaData.getId()).thenReturn("foo_id");
ClusterPersistRepository repository = mock(ClusterPersistRepository.class);
when(repository.getDirectly("/nodes/compute_nodes/worker_id/foo_id")).thenReturn("10");
assertThat(new ClusterWorkerIdGenerator(repository, instanceMetaData).generate(PropertiesBuilder.build(new Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
assertThat(new ClusterWorkerIdGenerator(repository, "foo_id").generate(PropertiesBuilder.build(new Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
}

@Test
void assertGenerateWithoutExistedWorkerId() {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
when(instanceMetaData.getId()).thenReturn("foo_id");
ClusterPersistRepository repository = mock(ClusterPersistRepository.class);
doAnswer((Answer<Object>) invocation -> "foo_id").when(repository).persistEphemeral("/worker_id/0", "foo_id");
assertThat(new ClusterWorkerIdGenerator(repository, instanceMetaData).generate(new Properties()), is(0));
assertThat(new ClusterWorkerIdGenerator(repository, "foo_id").generate(new Properties()), is(0));
}
}

0 comments on commit 8c9a165

Please sign in to comment.