From 6524514a7ea1439a962b38ce10b0baed63079a4a Mon Sep 17 00:00:00 2001 From: Haoran Meng Date: Wed, 22 May 2024 17:20:09 +0800 Subject: [PATCH] Rename ContextServiceFacade to PersistServiceFacade and move StatePersistService to PersistServiceFacade --- .../driver/state/DriverStateContextTest.java | 3 +-- .../mode/manager/ContextManager.java | 9 +++---- ...ce.java => ComputeNodePersistService.java} | 4 +-- ...eFacade.java => PersistServiceFacade.java} | 14 ++++++---- .../mode/state/StateContext.java | 15 ----------- ...eService.java => StatePersistService.java} | 10 +++---- ...ava => ComputeNodePersistServiceTest.java} | 26 +++++++++---------- .../mode/state/StateContextTest.java | 10 +------ ...Test.java => StatePersistServiceTest.java} | 12 ++++----- .../cluster/ClusterContextManagerBuilder.java | 12 ++++----- .../generator/ClusterWorkerIdGenerator.java | 12 ++++----- .../subscriber/StateChangedSubscriber.java | 3 ++- .../SessionConnectionReconnectListener.java | 8 +++--- .../updatable/LabelComputeNodeExecutor.java | 2 +- .../SetComputeNodeStateExecutor.java | 2 +- .../updatable/UnlabelComputeNodeExecutor.java | 4 +-- .../ral/updatable/UnlockClusterExecutor.java | 2 +- .../impl/ClusterReadWriteLockStrategy.java | 2 +- .../lock/impl/ClusterWriteLockStrategy.java | 2 +- 19 files changed, 66 insertions(+), 86 deletions(-) rename mode/core/src/main/java/org/apache/shardingsphere/mode/service/{ComputeNodeService.java => ComputeNodePersistService.java} (98%) rename mode/core/src/main/java/org/apache/shardingsphere/mode/service/{ContextServiceFacade.java => PersistServiceFacade.java} (67%) rename mode/core/src/main/java/org/apache/shardingsphere/mode/state/{StateService.java => StatePersistService.java} (88%) rename mode/core/src/test/java/org/apache/shardingsphere/mode/service/{ComputeNodeServiceTest.java => ComputeNodePersistServiceTest.java} (85%) rename mode/core/src/test/java/org/apache/shardingsphere/mode/state/{StateServiceTest.java => StatePersistServiceTest.java} (81%) diff --git a/jdbc/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java b/jdbc/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java index 68b27abc69398..a443a39a6532f 100644 --- a/jdbc/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java +++ b/jdbc/src/test/java/org/apache/shardingsphere/driver/state/DriverStateContextTest.java @@ -31,7 +31,6 @@ import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.mode.state.StateContext; -import org.apache.shardingsphere.mode.state.StateService; import org.apache.shardingsphere.traffic.rule.TrafficRule; import org.apache.shardingsphere.transaction.rule.TransactionRule; import org.junit.jupiter.api.BeforeEach; @@ -68,7 +67,7 @@ void setUp() { mock(MetaDataPersistService.class), new ShardingSphereMetaData(databases, mock(ResourceMetaData.class), globalRuleMetaData, new ConfigurationProperties(new Properties()))); when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts); when(contextManager.getComputeNodeInstanceContext().getInstance().getState()).thenReturn(new InstanceStateContext()); - when(contextManager.getStateContext()).thenReturn(new StateContext(mock(StateService.class))); + when(contextManager.getStateContext()).thenReturn(new StateContext()); } private Map mockDatabases() { diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java index ee37c2a856828..698651fa6c4ad 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java @@ -44,9 +44,8 @@ import org.apache.shardingsphere.mode.manager.switcher.ResourceSwitchManager; import org.apache.shardingsphere.mode.manager.switcher.SwitchingResource; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; -import org.apache.shardingsphere.mode.service.ContextServiceFacade; +import org.apache.shardingsphere.mode.service.PersistServiceFacade; import org.apache.shardingsphere.mode.state.StateContext; -import org.apache.shardingsphere.mode.state.StateService; import java.sql.SQLException; import java.util.Collection; @@ -75,7 +74,7 @@ public final class ContextManager implements AutoCloseable { private final StateContext stateContext; - private final ContextServiceFacade contextServiceFacade; + private final PersistServiceFacade persistServiceFacade; public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext) { this.metaDataContexts = new AtomicReference<>(metaDataContexts); @@ -84,8 +83,8 @@ public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNode configurationContextManager = new ConfigurationContextManager(this.metaDataContexts, computeNodeInstanceContext); resourceMetaDataContextManager = new ResourceMetaDataContextManager(this.metaDataContexts); executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)); - stateContext = new StateContext(new StateService(metaDataContexts.getPersistService().getRepository())); - contextServiceFacade = new ContextServiceFacade(metaDataContexts.getPersistService().getRepository()); + stateContext = new StateContext(); + persistServiceFacade = new PersistServiceFacade(metaDataContexts.getPersistService().getRepository()); } /** diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodeService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodePersistService.java similarity index 98% rename from mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodeService.java rename to mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodePersistService.java index 183c99abd0397..8eea3693aa51e 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodeService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ComputeNodePersistService.java @@ -38,11 +38,11 @@ import java.util.Optional; /** - * Compute node status service. + * Compute node persist service. */ @RequiredArgsConstructor @Slf4j -public final class ComputeNodeService { +public final class ComputeNodePersistService { private final PersistRepository repository; diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ContextServiceFacade.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java similarity index 67% rename from mode/core/src/main/java/org/apache/shardingsphere/mode/service/ContextServiceFacade.java rename to mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java index f178744d0285e..e6642d5c1c53b 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ContextServiceFacade.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java @@ -19,16 +19,20 @@ import lombok.Getter; import org.apache.shardingsphere.mode.spi.PersistRepository; +import org.apache.shardingsphere.mode.state.StatePersistService; /** - * Context service facade. + * Persist service facade. */ @Getter -public final class ContextServiceFacade { +public final class PersistServiceFacade { - private final ComputeNodeService computeNodeService; + private final ComputeNodePersistService computeNodePersistService; - public ContextServiceFacade(final PersistRepository repository) { - computeNodeService = new ComputeNodeService(repository); + private final StatePersistService statePersistService; + + public PersistServiceFacade(final PersistRepository repository) { + computeNodePersistService = new ComputeNodePersistService(repository); + statePersistService = new StatePersistService(repository); } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java index ca0f39c233344..32b7c685c41f4 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java @@ -17,22 +17,16 @@ package org.apache.shardingsphere.mode.state; -import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.state.cluster.ClusterState; import java.util.concurrent.atomic.AtomicReference; /** * State context. */ -@RequiredArgsConstructor public final class StateContext { private final AtomicReference currentClusterState = new AtomicReference<>(ClusterState.OK); - @Getter - private final StateService stateService; - /** * Get current cluster state. * @@ -50,13 +44,4 @@ public ClusterState getCurrentClusterState() { public void switchCurrentClusterState(final ClusterState state) { currentClusterState.set(state); } - - /** - * Update cluster state. - * - * @param state cluster state - */ - public void updateClusterState(final ClusterState state) { - stateService.persist(state); - } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StatePersistService.java similarity index 88% rename from mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java rename to mode/core/src/main/java/org/apache/shardingsphere/mode/state/StatePersistService.java index 818bff9a36d30..0dfa95215a368 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StatePersistService.java @@ -26,19 +26,19 @@ import java.util.Optional; /** - * State service. + * State persist service. */ @RequiredArgsConstructor -public final class StateService { +public final class StatePersistService { private final PersistRepository repository; /** - * Persist cluster state. + * Update cluster state. * * @param state cluster state */ - public void persist(final ClusterState state) { + public void updateClusterState(final ClusterState state) { repository.persist(ComputeNode.getClusterStateNodePath(), state.name()); } @@ -47,7 +47,7 @@ public void persist(final ClusterState state) { * * @return cluster state */ - public Optional load() { + public Optional loadClusterState() { String value = repository.getDirectly(ComputeNode.getClusterStateNodePath()); return Strings.isNullOrEmpty(value) ? Optional.empty() : Optional.of(ClusterState.valueOf(value)); } diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodeServiceTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodePersistServiceTest.java similarity index 85% rename from mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodeServiceTest.java rename to mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodePersistServiceTest.java index db85e7de7d5a3..fb7e065f33311 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodeServiceTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/service/ComputeNodePersistServiceTest.java @@ -47,7 +47,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -class ComputeNodeServiceTest { +class ComputeNodePersistServiceTest { @Mock private PersistRepository repository; @@ -56,7 +56,7 @@ class ComputeNodeServiceTest { void assertRegisterOnline() { ComputeNodeInstance computeNodeInstance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307)); computeNodeInstance.getLabels().add("test"); - new ComputeNodeService(repository).registerOnline(computeNodeInstance); + new ComputeNodePersistService(repository).registerOnline(computeNodeInstance); verify(repository).persistEphemeral(eq("/nodes/compute_nodes/online/proxy/" + computeNodeInstance.getMetaData().getId()), anyString()); verify(repository).persistEphemeral(ComputeNode.getComputeNodeStateNodePath(computeNodeInstance.getMetaData().getId()), InstanceState.OK.name()); verify(repository).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(computeNodeInstance.getMetaData().getId()), YamlEngine.marshal(Collections.singletonList("test"))); @@ -64,12 +64,12 @@ void assertRegisterOnline() { @Test void assertPersistInstanceLabels() { - ComputeNodeService computeNodeService = new ComputeNodeService(repository); + ComputeNodePersistService computeNodePersistService = new ComputeNodePersistService(repository); InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); final String instanceId = instanceMetaData.getId(); - computeNodeService.persistInstanceLabels(instanceId, Collections.singletonList("test")); + computeNodePersistService.persistInstanceLabels(instanceId, Collections.singletonList("test")); verify(repository).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.singletonList("test"))); - computeNodeService.persistInstanceLabels(instanceId, Collections.emptyList()); + computeNodePersistService.persistInstanceLabels(instanceId, Collections.emptyList()); verify(repository).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(Collections.emptyList())); } @@ -77,7 +77,7 @@ void assertPersistInstanceLabels() { void assertPersistInstanceWorkerId() { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); final String instanceId = instanceMetaData.getId(); - new ComputeNodeService(repository).persistInstanceWorkerId(instanceId, 100); + new ComputeNodePersistService(repository).persistInstanceWorkerId(instanceId, 100); verify(repository).persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId), String.valueOf(100)); } @@ -85,7 +85,7 @@ void assertPersistInstanceWorkerId() { void assertLoadInstanceLabels() { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); final String instanceId = instanceMetaData.getId(); - new ComputeNodeService(repository).loadInstanceLabels(instanceId); + new ComputeNodePersistService(repository).loadInstanceLabels(instanceId); verify(repository).getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId)); } @@ -93,7 +93,7 @@ void assertLoadInstanceLabels() { void assertLoadComputeNodeState() { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); final String instanceId = instanceMetaData.getId(); - new ComputeNodeService(repository).loadComputeNodeState(instanceId); + new ComputeNodePersistService(repository).loadComputeNodeState(instanceId); verify(repository).getDirectly(ComputeNode.getComputeNodeStateNodePath(instanceId)); } @@ -101,7 +101,7 @@ void assertLoadComputeNodeState() { void assertLoadInstanceWorkerId() { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); final String instanceId = instanceMetaData.getId(); - new ComputeNodeService(repository).loadInstanceWorkerId(instanceId); + new ComputeNodePersistService(repository).loadInstanceWorkerId(instanceId); verify(repository).getDirectly(ComputeNode.getInstanceWorkerIdNodePath(instanceId)); } @@ -117,7 +117,7 @@ void assertLoadAllComputeNodeInstances() { yamlComputeNodeData1.setAttribute("127.0.0.1@3308"); yamlComputeNodeData1.setVersion("foo_version"); when(repository.getDirectly("/nodes/compute_nodes/online/proxy/foo_instance_3308")).thenReturn(YamlEngine.marshal(yamlComputeNodeData1)); - List actual = new ArrayList<>(new ComputeNodeService(repository).loadAllComputeNodeInstances()); + List actual = new ArrayList<>(new ComputeNodePersistService(repository).loadAllComputeNodeInstances()); assertThat(actual.size(), is(2)); assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307")); assertThat(actual.get(0).getMetaData().getIp(), is(IpUtils.getIp())); @@ -130,19 +130,19 @@ void assertLoadAllComputeNodeInstances() { @Test void assertLoadComputeNodeInstance() { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307); - ComputeNodeInstance actual = new ComputeNodeService(repository).loadComputeNodeInstance(instanceMetaData); + ComputeNodeInstance actual = new ComputeNodePersistService(repository).loadComputeNodeInstance(instanceMetaData); assertThat(actual.getMetaData(), is(instanceMetaData)); } @Test void assertGetUsedWorkerIds() { - new ComputeNodeService(repository).getAssignedWorkerIds(); + new ComputeNodePersistService(repository).getAssignedWorkerIds(); verify(repository).getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath()); } @Test void assertUpdateComputeNodeState() { - new ComputeNodeService(repository).updateComputeNodeState("foo_instance_id", InstanceState.OK); + new ComputeNodePersistService(repository).updateComputeNodeState("foo_instance_id", InstanceState.OK); verify(repository).persistEphemeral(ComputeNode.getComputeNodeStateNodePath("foo_instance_id"), InstanceState.OK.name()); } } diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java index 1eddf34024fe9..ba42fc4314670 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java @@ -22,12 +22,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; class StateContextTest { - private final StateContext stateContext = new StateContext(mock(StateService.class)); + private final StateContext stateContext = new StateContext(); @Test void assertGetCurrentClusterState() { @@ -40,10 +38,4 @@ void assertSwitchCurrentClusterState() { stateContext.switchCurrentClusterState(ClusterState.UNAVAILABLE); assertThat(stateContext.getCurrentClusterState(), is(ClusterState.UNAVAILABLE)); } - - @Test - void assertUpdateClusterState() { - stateContext.updateClusterState(ClusterState.OK); - verify(stateContext.getStateService()).persist(ClusterState.OK); - } } diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StatePersistServiceTest.java similarity index 81% rename from mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java rename to mode/core/src/test/java/org/apache/shardingsphere/mode/state/StatePersistServiceTest.java index 05c13bc5d7c71..ca3b4994cb2b1 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StatePersistServiceTest.java @@ -28,21 +28,21 @@ import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) -class StateServiceTest { +class StatePersistServiceTest { @Mock private PersistRepository repository; @Test - void assertPersistClusterStateWithoutPath() { - StateService stateService = new StateService(repository); - stateService.persist(ClusterState.OK); + void assertUpdateClusterStateClusterStateWithoutPath() { + StatePersistService statePersistService = new StatePersistService(repository); + statePersistService.updateClusterState(ClusterState.OK); verify(repository).persist(ComputeNode.getClusterStateNodePath(), ClusterState.OK.name()); } @Test - void assertLoadClusterState() { - new StateService(repository).load(); + void assertLoadClusterStateClusterState() { + new StatePersistService(repository).loadClusterState(); verify(repository).getDirectly(ComputeNode.getClusterStateNodePath()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index afe0ec0d9b914..be3c61eef677c 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -47,7 +47,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; -import org.apache.shardingsphere.mode.state.StateService; +import org.apache.shardingsphere.mode.state.StatePersistService; import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService; import java.sql.SQLException; @@ -109,23 +109,23 @@ private void createSubscribers(final EventBusContext eventBusContext, final Clus private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext, final ClusterPersistRepository repository, final ContextManagerBuilderParameter param, final ContextManager contextManager) { - contextManager.getContextServiceFacade().getComputeNodeService().registerOnline(computeNodeInstanceContext.getInstance()); + contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance()); new GovernanceWatcherFactory(repository, eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList()).watchListeners(); if (null != param.getLabels()) { contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels()); } - contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getContextServiceFacade().getComputeNodeService().loadAllComputeNodeInstances()); + contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); new ClusterEventSubscriberRegistry(contextManager, repository).register(); } private void setClusterState(final ContextManager contextManager) { - StateService stateService = contextManager.getStateContext().getStateService(); - Optional clusterState = stateService.load(); + StatePersistService statePersistService = contextManager.getPersistServiceFacade().getStatePersistService(); + Optional clusterState = statePersistService.loadClusterState(); if (clusterState.isPresent()) { contextManager.getStateContext().switchCurrentClusterState(clusterState.get()); } else { - stateService.persist(ClusterState.OK); + statePersistService.updateClusterState(ClusterState.OK); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java index 814b44e050541..ff033d6d14bcf 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java @@ -25,7 +25,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdReservationNode; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException; -import org.apache.shardingsphere.mode.service.ComputeNodeService; +import org.apache.shardingsphere.mode.service.ComputeNodePersistService; import java.util.Collection; import java.util.Optional; @@ -45,14 +45,14 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator { private final String instanceId; - private final ComputeNodeService computeNodeService; + private final ComputeNodePersistService computeNodePersistService; private final AtomicBoolean isWarned = new AtomicBoolean(false); public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, final String instanceId) { this.repository = repository; this.instanceId = instanceId; - computeNodeService = new ComputeNodeService(repository); + computeNodePersistService = new ComputeNodePersistService(repository); } @Override @@ -63,7 +63,7 @@ public int generate(final Properties props) { } private Optional loadExistedWorkerId() { - return computeNodeService.loadInstanceWorkerId(instanceId); + return computeNodePersistService.loadInstanceWorkerId(instanceId); } private int generateNewWorkerId() { @@ -72,12 +72,12 @@ private int generateNewWorkerId() { generatedWorkId = generateAvailableWorkerId(); } while (!generatedWorkId.isPresent()); int result = generatedWorkId.get(); - computeNodeService.persistInstanceWorkerId(instanceId, result); + computeNodePersistService.persistInstanceWorkerId(instanceId, result); return result; } private Optional generateAvailableWorkerId() { - Collection assignedWorkerIds = computeNodeService.getAssignedWorkerIds(); + Collection assignedWorkerIds = computeNodePersistService.getAssignedWorkerIds(); ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <= MAX_WORKER_ID + 1, WorkerIdAssignedException::new); PriorityQueue availableWorkerIds = IntStream.range(0, 1024).boxed().filter(each -> !assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new)); Integer preselectedWorkerId = availableWorkerIds.poll(); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java index 43759586e20b2..0bef0de37b894 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java @@ -112,7 +112,8 @@ public synchronized void renew(final LabelsEvent event) { */ @Subscribe public synchronized void renew(final InstanceOnlineEvent event) { - contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(contextManager.getContextServiceFacade().getComputeNodeService().loadComputeNodeInstance(event.getInstanceMetaData())); + contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(contextManager.getPersistServiceFacade() + .getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData())); } /** diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java index 00ace637d529a..09b286d66f123 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java @@ -24,7 +24,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; -import org.apache.shardingsphere.mode.service.ComputeNodeService; +import org.apache.shardingsphere.mode.service.ComputeNodePersistService; import java.util.Properties; @@ -38,11 +38,11 @@ public final class SessionConnectionReconnectListener implements ConnectionState private final ComputeNodeInstanceContext computeNodeInstanceContext; - private final ComputeNodeService computeNodeService; + private final ComputeNodePersistService computeNodePersistService; public SessionConnectionReconnectListener(final ComputeNodeInstanceContext computeNodeInstanceContext, final ClusterPersistRepository repository) { this.computeNodeInstanceContext = computeNodeInstanceContext; - this.computeNodeService = new ComputeNodeService(repository); + this.computeNodePersistService = new ComputeNodePersistService(repository); } @Override @@ -63,7 +63,7 @@ private boolean reconnect(final CuratorFramework client) { if (isNeedGenerateWorkerId()) { computeNodeInstanceContext.generateWorkerId(new Properties()); } - computeNodeService.registerOnline(computeNodeInstanceContext.getInstance()); + computeNodePersistService.registerOnline(computeNodeInstanceContext.getInstance()); return true; } sleepInterval(); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java index 6538f1a4d10d1..5528182ca2fe4 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java @@ -44,7 +44,7 @@ public void executeUpdate(final LabelComputeNodeStatement sqlStatement, final Co if (!sqlStatement.isOverwrite()) { labels.addAll(computeNodeInstance.get().getLabels()); } - contextManager.getContextServiceFacade().getComputeNodeService().persistInstanceLabels(instanceId, new LinkedList<>(labels)); + contextManager.getPersistServiceFacade().getComputeNodePersistService().persistInstanceLabels(instanceId, new LinkedList<>(labels)); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java index 7436fb8ebe97b..eddb2e30b8462 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java @@ -38,7 +38,7 @@ public void executeUpdate(final SetComputeNodeStateStatement sqlStatement, final } else { checkEnablingIsValid(contextManager, sqlStatement.getInstanceId()); } - contextManager.getContextServiceFacade().getComputeNodeService().updateComputeNodeState(sqlStatement.getInstanceId(), + contextManager.getPersistServiceFacade().getComputeNodePersistService().updateComputeNodeState(sqlStatement.getInstanceId(), "DISABLE".equals(sqlStatement.getState()) ? InstanceState.CIRCUIT_BREAK : InstanceState.OK); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java index 4948c0b9ac585..8ad01485fa44f 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java @@ -42,10 +42,10 @@ public void executeUpdate(final UnlabelComputeNodeStatement sqlStatement, final if (computeNodeInstance.isPresent()) { Collection labels = new LinkedHashSet<>(computeNodeInstance.get().getLabels()); if (sqlStatement.getLabels().isEmpty()) { - contextManager.getContextServiceFacade().getComputeNodeService().persistInstanceLabels(instanceId, Collections.emptyList()); + contextManager.getPersistServiceFacade().getComputeNodePersistService().persistInstanceLabels(instanceId, Collections.emptyList()); } else { labels.removeAll(sqlStatement.getLabels()); - contextManager.getContextServiceFacade().getComputeNodeService().persistInstanceLabels(instanceId, new ArrayList<>(labels)); + contextManager.getPersistServiceFacade().getComputeNodePersistService().persistInstanceLabels(instanceId, new ArrayList<>(labels)); } } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java index a6d9764f7bcee..f4558d84ee358 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java @@ -43,7 +43,7 @@ public void executeUpdate(final UnlockClusterStatement sqlStatement, final Conte if (lockContext.tryLock(lockDefinition, 3000L)) { try { checkState(contextManager); - contextManager.getStateContext().updateClusterState(ClusterState.OK); + contextManager.getPersistServiceFacade().getStatePersistService().updateClusterState(ClusterState.OK); // TODO unlock snapshot info if locked } finally { lockContext.unlock(lockDefinition); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java index 6e0bfd629df6a..ad9dba4c64036 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java @@ -28,7 +28,7 @@ public class ClusterReadWriteLockStrategy implements ClusterLockStrategy { @Override public void lock() { - ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.UNAVAILABLE); + ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getStatePersistService().updateClusterState(ClusterState.UNAVAILABLE); } @Override diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java index 3bc027c48f153..24da33d6b8652 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java @@ -28,7 +28,7 @@ public class ClusterWriteLockStrategy implements ClusterLockStrategy { @Override public void lock() { - ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.READ_ONLY); + ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getStatePersistService().updateClusterState(ClusterState.READ_ONLY); } @Override