Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DispatchEventSubscriber #34069

Merged
merged 1 commit into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber;

import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;

/**
* Dispatch event subscriber.
*/
public interface DispatchEventSubscriber extends EventSubscriber {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Cache evicted subscriber.
*/
public final class CacheEvictedSubscriber implements EventSubscriber {
public final class CacheEvictedSubscriber implements DispatchEventSubscriber {

/**
* Callback of any {@link DispatchEvent}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,38 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ComputeNodeInstanceStateChangedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.LabelsEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Compute node state subscriber.
*/
@RequiredArgsConstructor
public final class ComputeNodeStateSubscriber implements EventSubscriber {
public final class ComputeNodeStateSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

private final ComputeNodeInstanceContext computeNodeInstanceContext;

public ComputeNodeStateSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
computeNodeInstanceContext = contextManager.getComputeNodeInstanceContext();
}

/**
* Renew instance list.
*
* @param event compute node online event
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(
contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
}

/**
Expand All @@ -54,7 +59,7 @@ public synchronized void renew(final InstanceOnlineEvent event) {
*/
@Subscribe
public synchronized void renew(final InstanceOfflineEvent event) {
contextManager.getComputeNodeInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
computeNodeInstanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
}

/**
Expand All @@ -64,7 +69,7 @@ public synchronized void renew(final InstanceOfflineEvent event) {
*/
@Subscribe
public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event) {
contextManager.getComputeNodeInstanceContext().updateStatus(event.getInstanceId(), event.getStatus());
computeNodeInstanceContext.updateStatus(event.getInstanceId(), event.getStatus());
}

/**
Expand All @@ -74,7 +79,7 @@ public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event)
*/
@Subscribe
public synchronized void renew(final WorkerIdEvent event) {
contextManager.getComputeNodeInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
computeNodeInstanceContext.updateWorkerId(event.getInstanceId(), event.getWorkerId());
}

/**
Expand All @@ -85,6 +90,6 @@ public synchronized void renew(final WorkerIdEvent event) {
@Subscribe
public synchronized void renew(final LabelsEvent event) {
// TODO labels may be empty
contextManager.getComputeNodeInstanceContext().updateLabels(event.getInstanceId(), event.getLabels());
computeNodeInstanceContext.updateLabels(event.getInstanceId(), event.getLabels());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataChangedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.TableDataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.metadata.manager.ShardingSphereDatabaseDataManager;

/**
* Database data changed subscriber.
*/
@RequiredArgsConstructor
public final class DatabaseDataChangedSubscriber implements EventSubscriber {
public final class DatabaseDataChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;
private final ShardingSphereDatabaseDataManager databaseManager;

public DatabaseDataChangedSubscriber(final ContextManager contextManager) {
databaseManager = contextManager.getMetaDataContextManager().getDatabaseManager();
}

/**
* Renew to persist ShardingSphere database data.
Expand All @@ -44,7 +47,7 @@ public final class DatabaseDataChangedSubscriber implements EventSubscriber {
*/
@Subscribe
public synchronized void renew(final DatabaseDataAddedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereDatabaseData(event.getDatabaseName());
databaseManager.addShardingSphereDatabaseData(event.getDatabaseName());
}

/**
Expand All @@ -54,7 +57,7 @@ public synchronized void renew(final DatabaseDataAddedEvent event) {
*/
@Subscribe
public synchronized void renew(final DatabaseDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereDatabaseData(event.getDatabaseName());
databaseManager.dropShardingSphereDatabaseData(event.getDatabaseName());
}

/**
Expand All @@ -64,7 +67,7 @@ public synchronized void renew(final DatabaseDataDeletedEvent event) {
*/
@Subscribe
public synchronized void renew(final SchemaDataAddedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
databaseManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
}

/**
Expand All @@ -74,7 +77,7 @@ public synchronized void renew(final SchemaDataAddedEvent event) {
*/
@Subscribe
public synchronized void renew(final SchemaDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
databaseManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
}

/**
Expand All @@ -85,10 +88,10 @@ public synchronized void renew(final SchemaDataDeletedEvent event) {
@Subscribe
public synchronized void renew(final TableDataChangedEvent event) {
if (null != event.getAddedTable()) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
databaseManager.addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
}
if (null != event.getDeletedTable()) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
databaseManager.dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
}
}

Expand All @@ -99,7 +102,7 @@ public synchronized void renew(final TableDataChangedEvent event) {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
databaseManager.alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
}

/**
Expand All @@ -109,6 +112,6 @@ public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
databaseManager.deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.config.AlterGlobalRuleConfigurationEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.RuleConfigurationPersistDecorator;

import java.util.Optional;
Expand All @@ -33,7 +33,7 @@
* Global rule configuration event subscriber.
*/
@RequiredArgsConstructor
public final class GlobalRuleConfigurationEventSubscriber implements EventSubscriber {
public final class GlobalRuleConfigurationEventSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
import org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -35,7 +35,7 @@
* Listener assisted subscriber.
*/
@RequiredArgsConstructor
public final class ListenerAssistedSubscriber implements EventSubscriber {
public final class ListenerAssistedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
Expand All @@ -31,14 +30,15 @@
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Meta data changed subscriber.
*/
public final class MetaDataChangedSubscriber implements EventSubscriber {
public final class MetaDataChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;

import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -40,12 +40,19 @@
/**
* Process list changed subscriber.
*/
@RequiredArgsConstructor
public final class ProcessListChangedSubscriber implements EventSubscriber {
public final class ProcessListChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
private final PersistRepository repository;

private final YamlProcessListSwapper swapper;

public ProcessListChangedSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
repository = contextManager.getPersistServiceFacade().getRepository();
swapper = new YamlProcessListSwapper();
}

/**
* Report local processes.
Expand All @@ -59,10 +66,9 @@ public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
}
Collection<Process> processes = ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
contextManager.getPersistServiceFacade().getRepository().persist(
ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
}

/**
Expand Down Expand Up @@ -93,7 +99,7 @@ public synchronized void killLocalProcess(final KillLocalProcessEvent event) thr
each.cancel();
}
}
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.config.AlterPropertiesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Properties event subscriber.
*/
@RequiredArgsConstructor
public final class PropertiesEventSubscriber implements EventSubscriber {
public final class PropertiesEventSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Loading
Loading