Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove event bus for lock and unlock cluster #31323

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public ClusterState getCurrentClusterState() {
public void switchCurrentClusterState(final ClusterState state) {
currentClusterState.set(state);
}

/**
* Update cluster state.
*
* @param state cluster state
*/
public void updateClusterState(final ClusterState state) {
stateService.persist(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ public final class StateService {
* @param state cluster state
*/
public void persist(final ClusterState state) {
if (Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStateNodePath()))) {
repository.persist(ComputeNode.getClusterStateNodePath(), state.name());
}
repository.persist(ComputeNode.getClusterStateNodePath(), state.name());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

class StateContextTest {

Expand All @@ -39,4 +40,10 @@ void assertSwitchCurrentClusterState() {
stateContext.switchCurrentClusterState(ClusterState.UNAVAILABLE);
assertThat(stateContext.getCurrentClusterState(), is(ClusterState.UNAVAILABLE));
}

@Test
void assertUpdateClusterState() {
stateContext.updateClusterState(ClusterState.OK);
verify(stateContext.getStateService()).persist(ClusterState.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

@ExtendWith(MockitoExtension.class)
class StateServiceTest {
Expand All @@ -42,14 +40,6 @@ void assertPersistClusterStateWithoutPath() {
verify(repository).persist(ComputeNode.getClusterStateNodePath(), ClusterState.OK.name());
}

@Test
void assertPersistClusterStateWithPath() {
StateService stateService = new StateService(repository);
when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name());
stateService.persist(ClusterState.OK);
verify(repository, times(0)).persist(ComputeNode.getClusterStateNodePath(), ClusterState.OK.name());
}

@Test
void assertLoadClusterState() {
new StateService(repository).load();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStateSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
Expand Down Expand Up @@ -106,7 +105,6 @@ private void setContextManagerAware(final ContextManager contextManager) {
// TODO remove the method, only keep ZooKeeper's events, remove all decouple events
private void createSubscribers(final EventBusContext eventBusContext, final ClusterPersistRepository repository) {
eventBusContext.register(new ComputeNodeStatusSubscriber(repository));
eventBusContext.register(new ClusterStateSubscriber(repository));
eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext));
eventBusContext.register(new ShardingSphereSchemaDataRegistrySubscriber(repository));
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;

/**
* Unlock cluster executor.
Expand All @@ -44,7 +43,7 @@ public void executeUpdate(final UnlockClusterStatement sqlStatement, final Conte
if (lockContext.tryLock(lockDefinition, 3000L)) {
try {
checkState(contextManager);
contextManager.getComputeNodeInstanceContext().getEventBusContext().post(new ClusterStateChangedEvent(ClusterState.OK));
contextManager.getStateContext().updateClusterState(ClusterState.OK);
// TODO unlock snapshot info if locked
} finally {
lockContext.unlock(lockDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.lock.impl;

import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;

Expand All @@ -29,7 +28,7 @@ public class ClusterReadWriteLockStrategy implements ClusterLockStrategy {

@Override
public void lock() {
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new ClusterStateChangedEvent(ClusterState.UNAVAILABLE));
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.UNAVAILABLE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.lock.impl;

import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;

Expand All @@ -29,7 +28,7 @@ public class ClusterWriteLockStrategy implements ClusterLockStrategy {

@Override
public void lock() {
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new ClusterStateChangedEvent(ClusterState.READ_ONLY));
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.READ_ONLY);
}

@Override
Expand Down