Skip to content

Commit

Permalink
Add DeliverEventSubscriberFactory SPI (#34041)
Browse files Browse the repository at this point in the history
* Add DeliverEventSubscriberFactory SPI

* Add DeliverEventSubscriberFactory SPI

* Add DeliverEventSubscriberFactory SPI
  • Loading branch information
terrymanu authored Dec 13, 2024
1 parent f39b52b commit a2de706
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.event.deliver.datasource.qualified;
package org.apache.shardingsphere.readwritesplitting.subscriber;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
package org.apache.shardingsphere.readwritesplitting.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Deliver data source status subscriber.
* Qualified data source deleted subscriber.
*/
@RequiredArgsConstructor
public final class DeliverQualifiedDataSourceSubscriber implements EventSubscriber {
public final class QualifiedDataSourceDeletedSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;
private final PersistRepository repository;

/**
* Delete qualified data source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,20 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber;
package org.apache.shardingsphere.readwritesplitting.subscriber;

import lombok.Getter;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collection;
import java.util.Collections;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Cluster deliver event subscriber registry.
* Qualified data source deleted subscriber factory.
*/
@Getter
public final class ClusterDeliverEventSubscriberRegistry {

private final Collection<EventSubscriber> subscribers;
public final class QualifiedDataSourceDeletedSubscriberFactory implements DeliverEventSubscriberFactory {

public ClusterDeliverEventSubscriberRegistry(final ClusterPersistRepository repository) {
subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber(repository));
@Override
public EventSubscriber create(final PersistRepository repository, final EventBusContext eventBusContext) {
return new QualifiedDataSourceDeletedSubscriber(repository);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedSubscriberFactory
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
package org.apache.shardingsphere.readwritesplitting.subscriber;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -29,16 +28,16 @@
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class DeliverQualifiedDataSourceSubscriberTest {
class QualifiedDataSourceDeletedSubscriberTest {

private DeliverQualifiedDataSourceSubscriber subscriber;
private QualifiedDataSourceDeletedSubscriber subscriber;

@Mock
private ClusterPersistRepository repository;
private PersistRepository repository;

@BeforeEach
void setUp() {
subscriber = new DeliverQualifiedDataSourceSubscriber(repository);
subscriber = new QualifiedDataSourceDeletedSubscriber(repository);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.event.deliver;

import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Deliver event subscriber factory.
*/
@SingletonSPI
public interface DeliverEventSubscriberFactory {

/**
* Create deliver event subscriber.
*
* @param repository cluster persist repository
* @param eventBusContext event bus context
* @return created event subscriber
*/
EventSubscriber create(PersistRepository repository, EventBusContext eventBusContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
Expand All @@ -31,10 +32,10 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.ClusterDeliverEventSubscriberRegistry;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
import org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
Expand All @@ -44,6 +45,7 @@

import java.sql.SQLException;
import java.util.Collection;
import java.util.stream.Collectors;

/**
* Cluster context manager builder.
Expand Down Expand Up @@ -78,7 +80,8 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
EventSubscriberRegistry eventSubscriberRegistry = new EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(repository).getSubscribers());
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
.map(each -> each.create(repository, contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
}

Expand Down

0 comments on commit a2de706

Please sign in to comment.