diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index f6409ee689d7a..9c17de92bf4f5 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -21,7 +21,6 @@ import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.instance.ComputeNodeInstance; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; -import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware; import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData; import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -60,11 +59,10 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder @Override public ContextManager build(final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) throws SQLException { ModeConfiguration modeConfig = param.getModeConfiguration(); - ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) modeConfig.getRepository()); + ClusterPersistRepositoryConfiguration config = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository(); + ClusterPersistRepository repository = getClusterPersistRepository(config); ComputeNodeInstanceContext computeNodeInstanceContext = buildComputeNodeInstanceContext(modeConfig, param.getInstanceMetaData(), repository, eventBusContext); - if (repository instanceof ComputeNodeInstanceContextAware) { - ((ComputeNodeInstanceContextAware) repository).setComputeNodeInstanceContext(computeNodeInstanceContext); - } + repository.init(config, computeNodeInstanceContext); MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository); MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext, new QualifiedDataSourceStatusService(repository).loadStatus()); ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository); @@ -76,9 +74,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config) { ShardingSpherePreconditions.checkNotNull(config, MissingRequiredClusterRepositoryConfigurationException::new); - ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps()); - result.init(config); - return result; + return TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps()); } private ComputeNodeInstanceContext buildComputeNodeInstanceContext(final ModeConfiguration modeConfig, diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java index 66e0ae464929a..b2d716bb81053 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; @@ -28,7 +29,7 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepository { @Override - public void init(final ClusterPersistRepositoryConfiguration config) { + public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { } @Override diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java index c4b40c487008d..9e63ea850fa2d 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; @@ -32,7 +33,7 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster private static final Map REGISTRY_DATA = new LinkedHashMap<>(); @Override - public void init(final ClusterPersistRepositoryConfiguration config) { + public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { } @Override diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java index b240ecea27b38..4a9e5fec6575b 100644 --- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java +++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.repository.cluster; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -30,8 +31,9 @@ public interface ClusterPersistRepository extends PersistRepository { * Initialize registry center. * * @param config cluster persist repository configuration + * @param computeNodeInstanceContext compute node instance context */ - void init(ClusterPersistRepositoryConfiguration config); + void init(ClusterPersistRepositoryConfiguration config, ComputeNodeInstanceContext computeNodeInstanceContext); /** * Persist exclusive ephemeral data. diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 71ef640546700..999457f73246c 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -35,12 +35,13 @@ import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey; -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; @@ -68,7 +69,7 @@ public final class EtcdRepository implements ClusterPersistRepository { private DistributedLockHolder distributedLockHolder; @Override - public void init(final ClusterPersistRepositoryConfiguration config) { + public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { etcdProps = new EtcdProperties(config.getProps()); client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists()))) .namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8)) diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index 76e20a97f79f4..1cfae70b242fa 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -28,7 +28,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; -import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -59,7 +58,7 @@ /** * Registry repository of ZooKeeper. */ -public final class ZookeeperRepository implements ClusterPersistRepository, ComputeNodeInstanceContextAware { +public final class ZookeeperRepository implements ClusterPersistRepository { private final Map caches = new ConcurrentHashMap<>(); @@ -71,10 +70,11 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Comp private DistributedLockHolder distributedLockHolder; @Override - public void init(final ClusterPersistRepositoryConfiguration config) { + public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps()); client = buildCuratorClient(config, zookeeperProps); distributedLockHolder = new DistributedLockHolder(getType(), client, zookeeperProps); + client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this)); initCuratorClient(zookeeperProps); } @@ -274,11 +274,6 @@ private void waitForCacheClose() { } } - @Override - public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext computeNodeInstanceContext) { - client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this)); - } - @Override public String getType() { return "ZooKeeper"; diff --git a/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java b/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java index 6d9f952818f35..c282b74c4c41d 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java @@ -29,6 +29,8 @@ import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable; import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.listen.Listenable; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock; @@ -107,7 +109,7 @@ void init() { mockClient(); mockBuilder(); ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, new Properties()); - REPOSITORY.init(config); + REPOSITORY.init(config, mock(ComputeNodeInstanceContext.class)); mockDistributedLockHolder(); } @@ -141,6 +143,7 @@ private void mockBuilder() { when(client.delete()).thenReturn(deleteBuilder); when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(backgroundVersionable); when(client.getChildren()).thenReturn(getChildrenBuilder); + when(client.getConnectionStateListenable()).thenReturn(mock(Listenable.class)); } @Test @@ -188,25 +191,28 @@ void assertBuildCuratorClientWithCustomConfiguration() { new Property(ZookeeperPropertyKey.MAX_RETRIES.getKey(), "1"), new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "1000"), new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "2000")); - assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, props))); + assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, props), + mock(ComputeNodeInstanceContext.class))); } @Test void assertBuildCuratorClientWithTimeToLiveSecondsEqualsZero() { assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration( - REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0"))))); + REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0"))), + mock(ComputeNodeInstanceContext.class))); } @Test void assertBuildCuratorClientWithOperationTimeoutMillisecondsEqualsZero() { assertDoesNotThrow(() -> REPOSITORY.init(new ClusterPersistRepositoryConfiguration( - REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0"))))); + REPOSITORY.getType(), "governance", SERVER_LISTS, PropertiesBuilder.build(new Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0"))), + mock(ComputeNodeInstanceContext.class))); } @Test void assertBuildCuratorClientWithDigest() { REPOSITORY.init(new ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance", SERVER_LISTS, - PropertiesBuilder.build(new Property(ZookeeperPropertyKey.DIGEST.getKey(), "any")))); + PropertiesBuilder.build(new Property(ZookeeperPropertyKey.DIGEST.getKey(), "any"))), mock(ComputeNodeInstanceContext.class)); verify(builder).aclProvider(any(ACLProvider.class)); } diff --git a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java index 497530abe6905..cf9924afabacc 100644 --- a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java +++ b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.proxy.fixture; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; @@ -32,7 +33,7 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo private static final Map REGISTRY_DATA = new LinkedHashMap<>(); @Override - public void init(final ClusterPersistRepositoryConfiguration config) { + public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) { } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index c744d85ca0a82..9f4c2847fd6c4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -50,6 +50,7 @@ import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; @@ -82,6 +83,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.mockito.Mockito.mock; + /** * Pipeline context utility class. */ @@ -125,7 +128,7 @@ private static DataSource createDataSource(final YamlRootConfiguration rootConfi private static ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration repositoryConfig) { ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, repositoryConfig.getType(), repositoryConfig.getProps()); - result.init(repositoryConfig); + result.init(repositoryConfig, mock(ComputeNodeInstanceContext.class)); return result; }