Skip to content

Commit

Permalink
Remove ComputeNodeInstanceContextAware interface for ClusterPersistRe…
Browse files Browse the repository at this point in the history
…pository (#31450)

* Remove ComputeNodeInstanceContextAware interface for ClusterPersistRepository

* Remove ComputeNodeInstanceContextAware interface for ClusterPersistRepository

* Remove ComputeNodeInstanceContextAware interface for ClusterPersistRepository
  • Loading branch information
menghaoranss authored May 30, 2024
1 parent d44e4f8 commit aa5ea90
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -60,11 +59,10 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
@Override
public ContextManager build(final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) throws SQLException {
ModeConfiguration modeConfig = param.getModeConfiguration();
ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) modeConfig.getRepository());
ClusterPersistRepositoryConfiguration config = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
ClusterPersistRepository repository = getClusterPersistRepository(config);
ComputeNodeInstanceContext computeNodeInstanceContext = buildComputeNodeInstanceContext(modeConfig, param.getInstanceMetaData(), repository, eventBusContext);
if (repository instanceof ComputeNodeInstanceContextAware) {
((ComputeNodeInstanceContextAware) repository).setComputeNodeInstanceContext(computeNodeInstanceContext);
}
repository.init(config, computeNodeInstanceContext);
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext, new QualifiedDataSourceStatusService(repository).loadStatus());
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
Expand All @@ -76,9 +74,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev

private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config) {
ShardingSpherePreconditions.checkNotNull(config, MissingRequiredClusterRepositoryConfigurationException::new);
ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps());
result.init(config);
return result;
return TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps());
}

private ComputeNodeInstanceContext buildComputeNodeInstanceContext(final ModeConfiguration modeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand All @@ -28,7 +29,7 @@
public final class ClusterPersistRepositoryFixture implements ClusterPersistRepository {

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand All @@ -32,7 +33,7 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster
private static final Map<String, String> REGISTRY_DATA = new LinkedHashMap<>();

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.repository.cluster;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.spi.PersistRepository;
Expand All @@ -30,8 +31,9 @@ public interface ClusterPersistRepository extends PersistRepository {
* Initialize registry center.
*
* @param config cluster persist repository configuration
* @param computeNodeInstanceContext compute node instance context
*/
void init(ClusterPersistRepositoryConfiguration config);
void init(ClusterPersistRepositoryConfiguration config, ComputeNodeInstanceContext computeNodeInstanceContext);

/**
* Persist exclusive ephemeral data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;

Expand Down Expand Up @@ -68,7 +69,7 @@ public final class EtcdRepository implements ClusterPersistRepository {
private DistributedLockHolder distributedLockHolder;

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
etcdProps = new EtcdProperties(config.getProps());
client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
.namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down Expand Up @@ -59,7 +58,7 @@
/**
* Registry repository of ZooKeeper.
*/
public final class ZookeeperRepository implements ClusterPersistRepository, ComputeNodeInstanceContextAware {
public final class ZookeeperRepository implements ClusterPersistRepository {

private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();

Expand All @@ -71,10 +70,11 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Comp
private DistributedLockHolder distributedLockHolder;

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
client = buildCuratorClient(config, zookeeperProps);
distributedLockHolder = new DistributedLockHolder(getType(), client, zookeeperProps);
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
initCuratorClient(zookeeperProps);
}

Expand Down Expand Up @@ -274,11 +274,6 @@ private void waitForCacheClose() {
}
}

@Override
public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext computeNodeInstanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
}

@Override
public String getType() {
return "ZooKeeper";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.listen.Listenable;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock;
Expand Down Expand Up @@ -107,7 +109,7 @@ void init() {
mockClient();
mockBuilder();
ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, new Properties());
REPOSITORY.init(config);
REPOSITORY.init(config, mock(ComputeNodeInstanceContext.class));
mockDistributedLockHolder();
}

Expand Down Expand Up @@ -141,6 +143,7 @@ private void mockBuilder() {
when(client.delete()).thenReturn(deleteBuilder);
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(backgroundVersionable);
when(client.getChildren()).thenReturn(getChildrenBuilder);
when(client.getConnectionStateListenable()).thenReturn(mock(Listenable.class));
}

@Test
Expand Down Expand Up @@ -188,25 +191,28 @@ void assertBuildCuratorClientWithCustomConfiguration() {
new Property(ZookeeperPropertyKey.MAX_RETRIES.getKey(), "1"),
new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "1000"),
new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "2000"));
assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, props)));
assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, props),
mock(ComputeNodeInstanceContext.class)));
}

@Test
void assertBuildCuratorClientWithTimeToLiveSecondsEqualsZero() {
assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(
REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0")))));
REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0"))),
mock(ComputeNodeInstanceContext.class)));
}

@Test
void assertBuildCuratorClientWithOperationTimeoutMillisecondsEqualsZero() {
assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(
REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0")))));
REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0"))),
mock(ComputeNodeInstanceContext.class)));
}

@Test
void assertBuildCuratorClientWithDigest() {
REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS,
PropertiesBuilder.build(new Property(ZookeeperPropertyKey.DIGEST.getKey(), "any"))));
PropertiesBuilder.build(new Property(ZookeeperPropertyKey.DIGEST.getKey(), "any"))), mock(ComputeNodeInstanceContext.class));
verify(builder).aclProvider(any(ACLProvider.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.proxy.fixture;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
Expand All @@ -32,7 +33,7 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo
private static final Map<String, String> REGISTRY_DATA = new LinkedHashMap<>();

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
Expand Down Expand Up @@ -82,6 +83,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;

/**
* Pipeline context utility class.
*/
Expand Down Expand Up @@ -125,7 +128,7 @@ private static DataSource createDataSource(final YamlRootConfiguration rootConfi

private static ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration repositoryConfig) {
ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, repositoryConfig.getType(), repositoryConfig.getProps());
result.init(repositoryConfig);
result.init(repositoryConfig, mock(ComputeNodeInstanceContext.class));
return result;
}

Expand Down

0 comments on commit aa5ea90

Please sign in to comment.