Skip to content

Commit

Permalink
Add EventSubscriber (#31222)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored May 13, 2024
1 parent fcb5901 commit 492822b
Show file tree
Hide file tree
Showing 19 changed files with 63 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public final class EventBusContext {
}

/**
* Register object.
* Register event subscriber.
*
* @param object object
* @param subscriber event subscriber
*/
public void register(final Object object) {
eventBus.register(object);
public void register(final EventSubscriber subscriber) {
eventBus.register(subscriber);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.util.eventbus;

/**
* Event subscriber.
*/
public interface EventSubscriber {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.infra.util.eventbus;

import org.apache.shardingsphere.infra.util.eventbus.fixture.EventListenerFixture;
import org.apache.shardingsphere.infra.util.eventbus.fixture.EventSubscriberFixture;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
Expand All @@ -28,7 +28,7 @@ class EventBusContextTest {
@Test
void assertEventBusContextTest() {
EventBusContext eventBusContext = new EventBusContext();
EventListenerFixture listener = new EventListenerFixture();
EventSubscriberFixture listener = new EventSubscriberFixture();
eventBusContext.register(listener);
eventBusContext.post("foo_event");
assertThat(listener.getEvents().size(), is(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import com.google.common.eventbus.Subscribe;
import lombok.Getter;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;

import java.util.LinkedList;
import java.util.List;

@Getter
public final class EventListenerFixture {
public final class EventSubscriberFixture implements EventSubscriber {

private final List<String> events = new LinkedList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import org.apache.shardingsphere.infra.rule.event.rule.alter.AlterRuleItemEvent;
import org.apache.shardingsphere.infra.rule.event.rule.drop.DropRuleItemEvent;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.spi.RuleItemConfigurationChangedProcessor;

/**
* Rule item changed subscriber.
*/
@RequiredArgsConstructor
public final class RuleItemChangedSubscriber {
public final class RuleItemChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* ShardingSphere schema data registry subscriber.
*/
public final class ShardingSphereSchemaDataRegistrySubscriber {
public final class ShardingSphereSchemaDataRegistrySubscriber implements EventSubscriber {

private final ShardingSphereDataPersistService persistService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
Expand All @@ -41,7 +42,7 @@
/**
* Cluster process subscriber.
*/
public final class ClusterProcessSubscriber implements ProcessSubscriber {
public final class ClusterProcessSubscriber implements ProcessSubscriber, EventSubscriber {

private final PersistRepository repository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
Expand All @@ -43,7 +44,7 @@
*/
@SuppressWarnings("unused")
@RequiredArgsConstructor
public final class ProcessListChangedSubscriber {
public final class ProcessListChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Cluster status subscriber.
*/
public final class ClusterStatusSubscriber {
public final class ClusterStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
Expand All @@ -29,7 +30,7 @@
/**
* Compute node status subscriber.
*/
public final class ComputeNodeStatusSubscriber {
public final class ComputeNodeStatusSubscriber implements EventSubscriber {

private final RegistryCenter registryCenter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;

/**
* Qualified data source status subscriber.
*/
public final class QualifiedDataSourceStatusSubscriber {
public final class QualifiedDataSourceStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;

/**
* Cache evicted subscriber.
*/
@SuppressWarnings("unused")
public final class CacheEvictedSubscriber {
public final class CacheEvictedSubscriber implements EventSubscriber {

/**
* Callback of any {@link GovernanceEvent}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.config.AlterGlobalRuleConfigurationEvent;
import org.apache.shardingsphere.mode.event.config.AlterPropertiesEvent;
import org.apache.shardingsphere.mode.event.datasource.unit.AlterStorageUnitEvent;
Expand All @@ -31,7 +32,7 @@
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class ConfigurationChangedSubscriber {
public final class ConfigurationChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
Expand All @@ -28,11 +29,11 @@
/**
* Context manager subscriber register.
*/
public final class ContextManagerSubscriberRegister {
public final class ContextManagerSubscriberRegister implements EventSubscriber {

private final EventBusContext eventBusContext;

private final Collection<Object> subscribers;
private final Collection<EventSubscriber> subscribers;

public ContextManagerSubscriberRegister(final RegistryCenter registryCenter, final ContextManager contextManager) {
eventBusContext = contextManager.getInstanceContext().getEventBusContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
Expand All @@ -33,7 +34,7 @@
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class DatabaseChangedSubscriber {
public final class DatabaseChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.schema.table.CreateOrAlterTableEvent;
import org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
import org.apache.shardingsphere.mode.event.schema.view.CreateOrAlterViewEvent;
Expand All @@ -38,7 +39,7 @@
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class ResourceMetaDataChangedSubscriber {
public final class ResourceMetaDataChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
Expand All @@ -42,7 +43,7 @@
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class StateChangedSubscriber {
public final class StateChangedSubscriber implements EventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.process.ProcessSubscriber;
import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
Expand All @@ -32,7 +33,7 @@
/**
* Standalone process subscriber.
*/
public final class StandaloneProcessSubscriber implements ProcessSubscriber {
public final class StandaloneProcessSubscriber implements ProcessSubscriber, EventSubscriber {

private final EventBusContext eventBusContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
Expand All @@ -45,7 +46,7 @@
/**
* Show process list executor.
*/
public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {
public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor, EventSubscriber {

private final boolean showFullProcesslist;

Expand Down

0 comments on commit 492822b

Please sign in to comment.