diff --git a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java index 03234fb9e05c1..5ecbd89d1bf4d 100644 --- a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java +++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource; import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute; import org.apache.shardingsphere.infra.state.datasource.DataSourceState; -import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent; +import org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent; import org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException; import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule; diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java similarity index 94% rename from mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java rename to features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java index 4f0497aa541c0..f21213b004d5b 100644 --- a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java +++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.event.deliver.datasource.qualified; +package org.apache.shardingsphere.readwritesplitting.subscriber; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java similarity index 76% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java rename to features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java index 252173f110531..878be851d6374 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java +++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type; +package org.apache.shardingsphere.readwritesplitting.subscriber; import com.google.common.eventbus.Subscribe; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent; -import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode; +import org.apache.shardingsphere.mode.spi.PersistRepository; /** - * Deliver data source status subscriber. + * Qualified data source deleted subscriber. */ @RequiredArgsConstructor -public final class DeliverQualifiedDataSourceSubscriber implements EventSubscriber { +public final class QualifiedDataSourceDeletedSubscriber implements EventSubscriber { - private final ClusterPersistRepository repository; + private final PersistRepository repository; /** * Delete qualified data source. diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java similarity index 54% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java rename to features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java index 3f874da0067fe..942a64d45aa0c 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java +++ b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java @@ -15,25 +15,20 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber; +package org.apache.shardingsphere.readwritesplitting.subscriber; -import lombok.Getter; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber; -import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; - -import java.util.Collection; -import java.util.Collections; +import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory; +import org.apache.shardingsphere.mode.spi.PersistRepository; /** - * Cluster deliver event subscriber registry. + * Qualified data source deleted subscriber factory. */ -@Getter -public final class ClusterDeliverEventSubscriberRegistry { - - private final Collection subscribers; +public final class QualifiedDataSourceDeletedSubscriberFactory implements DeliverEventSubscriberFactory { - public ClusterDeliverEventSubscriberRegistry(final ClusterPersistRepository repository) { - subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber(repository)); + @Override + public EventSubscriber create(final PersistRepository repository, final EventBusContext eventBusContext) { + return new QualifiedDataSourceDeletedSubscriber(repository); } } diff --git a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory new file mode 100644 index 0000000000000..1578fc246e635 --- /dev/null +++ b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedSubscriberFactory diff --git a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java index f56387f3ab4bf..cdfaf039bceb0 100644 --- a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java +++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource; import org.apache.shardingsphere.infra.state.datasource.DataSourceState; -import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent; +import org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent; import org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException; import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule; import org.junit.jupiter.api.Test; diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java similarity index 75% rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java rename to features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java index f9d25ff1cf5c0..1cdc92dde2f88 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java +++ b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type; +package org.apache.shardingsphere.readwritesplitting.subscriber; import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource; -import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent; -import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.spi.PersistRepository; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,16 +28,16 @@ import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) -class DeliverQualifiedDataSourceSubscriberTest { +class QualifiedDataSourceDeletedSubscriberTest { - private DeliverQualifiedDataSourceSubscriber subscriber; + private QualifiedDataSourceDeletedSubscriber subscriber; @Mock - private ClusterPersistRepository repository; + private PersistRepository repository; @BeforeEach void setUp() { - subscriber = new DeliverQualifiedDataSourceSubscriber(repository); + subscriber = new QualifiedDataSourceDeletedSubscriber(repository); } @Test diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java new file mode 100644 index 0000000000000..a43902000c2e6 --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.event.deliver; + +import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import org.apache.shardingsphere.mode.spi.PersistRepository; + +/** + * Deliver event subscriber factory. + */ +@SingletonSPI +public interface DeliverEventSubscriberFactory { + + /** + * Create deliver event subscriber. + * + * @param repository cluster persist repository + * @param eventBusContext event bus context + * @return created event subscriber + */ + EventSubscriber create(PersistRepository repository, EventBusContext eventBusContext); +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index a8c574fd3a5b7..c942b31c0fca5 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData; import org.apache.shardingsphere.infra.lock.LockContext; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.MetaDataPersistService; @@ -31,10 +32,10 @@ import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.ContextManagerBuilder; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; -import org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.ClusterDeliverEventSubscriberRegistry; +import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry; import org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry; import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService; import org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; @@ -44,6 +45,7 @@ import java.sql.SQLException; import java.util.Collection; +import java.util.stream.Collectors; /** * Cluster context manager builder. @@ -78,7 +80,8 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register(); EventSubscriberRegistry eventSubscriberRegistry = new EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext()); - eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(repository).getSubscribers()); + eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream() + .map(each -> each.create(repository, contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList())); eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers()); }