From d1e6c60e359a3666a8dcbb858dbb16d8072d140f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 20:38:45 +0800 Subject: [PATCH 1/9] Refactor ProcessListChangedSubscriber --- .../subscriber/type/ProcessListChangedSubscriber.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java index 4e9c64cd2c84e..d4e9c65924b56 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java @@ -42,14 +42,14 @@ */ public final class ProcessListChangedSubscriber implements DispatchEventSubscriber { - private final ContextManager contextManager; + private final String instanceMetaDataId; private final PersistRepository repository; private final YamlProcessListSwapper swapper; public ProcessListChangedSubscriber(final ContextManager contextManager) { - this.contextManager = contextManager; + instanceMetaDataId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(); repository = contextManager.getPersistServiceFacade().getRepository(); swapper = new YamlProcessListSwapper(); } @@ -61,7 +61,7 @@ public ProcessListChangedSubscriber(final ContextManager contextManager) { */ @Subscribe public void reportLocalProcesses(final ReportLocalProcessesEvent event) { - if (!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId())) { + if (!event.getInstanceId().equals(instanceMetaDataId)) { return; } Collection processes = ProcessRegistry.getInstance().listAll(); @@ -89,7 +89,7 @@ public synchronized void completeToReportLocalProcesses(final ReportLocalProcess */ @Subscribe public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException { - if (!event.getInstanceId().equals(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId())) { + if (!event.getInstanceId().equals(instanceMetaDataId)) { return; } Process process = ProcessRegistry.getInstance().get(event.getProcessId()); From 46119f2e0811a605087e44be07f22ae1d508424d Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:04:04 +0800 Subject: [PATCH 2/9] Refactor ProcessListChangedSubscriber --- .../infra/executor/sql/process/Process.java | 13 ++++++ .../executor/sql/process/ProcessRegistry.java | 15 ++++++- .../divided/ProcessPersistService.java | 16 +++++++ .../type/ProcessListChangedSubscriber.java | 45 +++++-------------- .../service/ClusterProcessPersistService.java | 17 +++++++ .../StandaloneProcessPersistService.java | 17 +++---- 6 files changed, 81 insertions(+), 42 deletions(-) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java index f715b1af99b54..a1eae30a7ec5b 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.metadata.user.Grantee; +import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashMap; import java.util.Map; @@ -151,4 +152,16 @@ public boolean isIdle() { public void removeProcessStatement(final ExecutionUnit executionUnit) { processStatements.remove(System.identityHashCode(executionUnit)); } + + /** + * Kill process. + * + * @throws SQLException SQL exception + */ + public void kill() throws SQLException { + setInterrupted(true); + for (Statement each : processStatements.values()) { + each.cancel(); + } + } } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java index 53a5c18f6c98f..b28660f0bf7e2 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException; +import java.sql.SQLException; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -94,11 +95,23 @@ public void remove(final String id) { } /** - * List all process. + * List all processes. * * @return all processes */ public Collection listAll() { return processes.values(); } + + /** + * Kill process. + * + * @throws SQLException SQL exception + */ + public void kill(final String processId) throws SQLException { + Process process = ProcessRegistry.getInstance().get(processId); + if (null != process) { + process.kill(); + } + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java index 0794364a6b819..91be78ef9784f 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java @@ -27,6 +27,14 @@ */ public interface ProcessPersistService { + /** + * Report local processes. + * + * @param instanceId instance ID + * @param taskId task ID + */ + void reportLocalProcesses(final String instanceId, final String taskId); + /** * Get process list. * @@ -41,4 +49,12 @@ public interface ProcessPersistService { * @throws SQLException SQL exception */ void killProcess(String processId) throws SQLException; + + /** + * Clean process. + * + * @param instanceId instance ID + * @param processId process ID + */ + void cleanProcess(String instanceId, String processId); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java index d4e9c65924b56..32f2243491924 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java @@ -18,57 +18,42 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; -import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.metadata.persist.node.ProcessNode; +import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; -import org.apache.shardingsphere.mode.spi.PersistRepository; +import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService; import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; /** * Process list changed subscriber. */ public final class ProcessListChangedSubscriber implements DispatchEventSubscriber { - private final String instanceMetaDataId; - - private final PersistRepository repository; + private final String instanceId; - private final YamlProcessListSwapper swapper; + private final ProcessPersistService processPersistService; public ProcessListChangedSubscriber(final ContextManager contextManager) { - instanceMetaDataId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(); - repository = contextManager.getPersistServiceFacade().getRepository(); - swapper = new YamlProcessListSwapper(); + instanceId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(); + processPersistService = contextManager.getPersistServiceFacade().getProcessPersistService(); } /** * Report local processes. * - * @param event show process list trigger event + * @param event report local processes event */ @Subscribe public void reportLocalProcesses(final ReportLocalProcessesEvent event) { - if (!event.getInstanceId().equals(instanceMetaDataId)) { - return; - } - Collection processes = ProcessRegistry.getInstance().listAll(); - if (!processes.isEmpty()) { - repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); + if (event.getInstanceId().equals(instanceId)) { + processPersistService.reportLocalProcesses(instanceId, event.getTaskId()); } - repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId())); } /** @@ -89,17 +74,11 @@ public synchronized void completeToReportLocalProcesses(final ReportLocalProcess */ @Subscribe public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException { - if (!event.getInstanceId().equals(instanceMetaDataId)) { + if (!event.getInstanceId().equals(instanceId)) { return; } - Process process = ProcessRegistry.getInstance().get(event.getProcessId()); - if (null != process) { - process.setInterrupted(true); - for (Statement each : process.getProcessStatements().values()) { - each.cancel(); - } - } - repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId())); + ProcessRegistry.getInstance().kill(event.getProcessId()); + processPersistService.cleanProcess(instanceId, event.getProcessId()); } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java index 3bc48b500c103..9bcc9470062c1 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java @@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.process.Process; +import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; @@ -43,6 +44,17 @@ public final class ClusterProcessPersistService implements ProcessPersistService private final PersistRepository repository; + private final YamlProcessListSwapper swapper = new YamlProcessListSwapper(); + + @Override + public void reportLocalProcesses(final String instanceId, final String taskId) { + Collection processes = ProcessRegistry.getInstance().listAll(); + if (!processes.isEmpty()) { + repository.persist(ProcessNode.getProcessListInstancePath(taskId, instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); + } + repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId, taskId)); + } + @Override public Collection getProcessList() { String taskId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""); @@ -98,4 +110,9 @@ private Collection getKillProcessTriggerPaths(final String processId) { private boolean isReady(final Collection paths) { return paths.stream().noneMatch(each -> null != repository.query(each)); } + + @Override + public void cleanProcess(final String instanceId, final String processId) { + repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId, processId)); + } } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java index 135bc0a1a55a5..d7c0e6e50576b 100644 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collection; /** @@ -30,6 +29,10 @@ */ public final class StandaloneProcessPersistService implements ProcessPersistService { + @Override + public void reportLocalProcesses(final String instanceId, final String taskId) { + } + @Override public Collection getProcessList() { return ProcessRegistry.getInstance().listAll(); @@ -37,12 +40,10 @@ public Collection getProcessList() { @Override public void killProcess(final String processId) throws SQLException { - Process process = ProcessRegistry.getInstance().get(processId); - if (null == process) { - return; - } - for (Statement each : process.getProcessStatements().values()) { - each.cancel(); - } + ProcessRegistry.getInstance().kill(processId); + } + + @Override + public void cleanProcess(final String instanceId, final String processId) { } } From 07ef6d0a0eea4d8215a49f2ad59fe0f261779bad Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:06:44 +0800 Subject: [PATCH 3/9] Refactor ProcessListChangedSubscriber --- .../mysql/handler/admin/executor/ShowProcessListExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java index 57dd3db0bdb10..3254b62ee3f5d 100644 --- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java +++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java @@ -62,7 +62,7 @@ public void execute(final ConnectionSession connectionSession) { private QueryResult getQueryResult() { Collection processes = ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList(); - if (null == processes || processes.isEmpty()) { + if (processes.isEmpty()) { return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList()); } List rows = processes.stream().map(this::getMemoryQueryResultDataRow).collect(Collectors.toList()); From 9a266a555a7c3312cbfe5f38b2951a322424c01b Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:22:05 +0800 Subject: [PATCH 4/9] Refactor ProcessListChangedSubscriber --- .../ProcessListChangedSubscriberTest.java | 42 ++++--------------- .../ClusterProcessPersistServiceTest.java | 27 +++++++++++- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java index 2dab71dc3d4ff..5cd48c2be7b97 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriberTest.java @@ -17,14 +17,13 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; -import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; +import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; @@ -37,12 +36,9 @@ import org.mockito.quality.Strictness; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collections; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -72,19 +68,10 @@ void assertReportLocalProcessesWithNotCurrentInstance() { } @Test - void assertReportEmptyLocalProcesses() { + void assertReportLocalProcesses() { when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList()); subscriber.reportLocalProcesses(new ReportLocalProcessesEvent("foo_instance_id", "foo_task_id")); - verify(contextManager.getPersistServiceFacade().getRepository(), times(0)).persist(any(), any()); - verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id"); - } - - @Test - void assertReportNotEmptyLocalProcesses() { - when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class, RETURNS_DEEP_STUBS))); - subscriber.reportLocalProcesses(new ReportLocalProcessesEvent("foo_instance_id", "foo_task_id")); - verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"), any()); - verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id"); + verify(contextManager.getPersistServiceFacade().getProcessPersistService()).reportLocalProcesses("foo_instance_id", "foo_task_id"); } @Test @@ -94,28 +81,15 @@ void assertCompleteToReportLocalProcesses() { } @Test - void assertKillLocalProcessWithNotCurrentInstance() throws SQLException { - subscriber.killLocalProcess(new KillLocalProcessEvent("bar_instance_id", "foo_pid")); - verify(contextManager.getPersistServiceFacade().getRepository(), times(0)).delete(any()); - } - - @Test - void assertKillLocalProcessWithoutExistedProcess() throws SQLException { - when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null); + void assertKillLocalProcessWithCurrentInstance() throws SQLException { subscriber.killLocalProcess(new KillLocalProcessEvent("foo_instance_id", "foo_pid")); - verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid"); + verify(contextManager.getPersistServiceFacade().getProcessPersistService()).cleanProcess("foo_instance_id", "foo_pid"); } @Test - void assertKillLocalProcessWithExistedProcess() throws SQLException { - Process process = mock(Process.class, RETURNS_DEEP_STUBS); - Statement statement = mock(Statement.class); - when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, statement)); - when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process); - subscriber.killLocalProcess(new KillLocalProcessEvent("foo_instance_id", "foo_pid")); - verify(process).setInterrupted(true); - verify(statement).cancel(); - verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid"); + void assertKillLocalProcessWithNotCurrentInstance() throws SQLException { + subscriber.killLocalProcess(new KillLocalProcessEvent("bar_instance_id", "foo_pid")); + verify(contextManager.getPersistServiceFacade().getProcessPersistService(), times(0)).cleanProcess("bar_instance_id", "foo_pid"); } @Test diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java index efebb308c673e..6d769be918691 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.persist.service; import org.apache.shardingsphere.infra.executor.sql.process.Process; +import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess; import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; @@ -37,15 +38,17 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(AutoMockExtension.class) -@StaticMockSettings(ProcessOperationLockRegistry.class) +@StaticMockSettings({ProcessRegistry.class, ProcessOperationLockRegistry.class}) class ClusterProcessPersistServiceTest { @Mock @@ -58,6 +61,22 @@ void setUp() { processPersistService = new ClusterProcessPersistService(repository); } + @Test + void assertReportEmptyLocalProcesses() { + when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList()); + processPersistService.reportLocalProcesses("foo_instance_id", "foo_task_id"); + verify(repository, times(0)).persist(any(), any()); + verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id"); + } + + @Test + void assertReportNotEmptyLocalProcesses() { + when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class, RETURNS_DEEP_STUBS))); + processPersistService.reportLocalProcesses("foo_instance_id", "foo_task_id"); + verify(repository).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"), any()); + verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id"); + } + @Test void assertGetCompletedProcessList() { when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), any())).thenReturn(true); @@ -111,4 +130,10 @@ private void assertKillProcess() { processPersistService.killProcess("foo_process_id"); verify(repository).persist("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id", ""); } + + @Test + void assertCleanProcess() { + processPersistService.cleanProcess("foo_instance_id", "foo_pid"); + verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid"); + } } From 16c0c3dff814feccf9598311adf54af94cb54bbd Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:27:27 +0800 Subject: [PATCH 5/9] Refactor ProcessListChangedSubscriber --- .../StandaloneProcessPersistServiceTest.java | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java index 45c6a36b44c54..156636d56a2fc 100644 --- a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java +++ b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistServiceTest.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.mode.manager.standalone.persist.service; -import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; @@ -25,10 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,19 +47,7 @@ void assertGetProcessList() { void assertKillProcess() throws SQLException { ProcessRegistry processRegistry = mock(ProcessRegistry.class); when(ProcessRegistry.getInstance()).thenReturn(processRegistry); - Process process = mock(Process.class); - Statement statement = mock(Statement.class); - when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, statement)); - when(processRegistry.get("foo_id")).thenReturn(process); processPersistService.killProcess("foo_id"); - verify(statement).cancel(); - } - - @Test - void assertKillProcessWithNotExistedProcessId() { - ProcessRegistry processRegistry = mock(ProcessRegistry.class); - when(ProcessRegistry.getInstance()).thenReturn(processRegistry); - when(processRegistry.get("foo_id")).thenReturn(null); - assertDoesNotThrow(() -> processPersistService.killProcess("foo_id")); + verify(ProcessRegistry.getInstance()).kill("foo_id"); } } From 155c300139fabfdc5d385a0679408eeb699e47c3 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:31:33 +0800 Subject: [PATCH 6/9] Refactor ProcessListChangedSubscriber --- .../mode/persist/service/divided/ProcessPersistService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java index 91be78ef9784f..3a47a8f1325be 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java @@ -29,7 +29,7 @@ public interface ProcessPersistService { /** * Report local processes. - * + * * @param instanceId instance ID * @param taskId task ID */ @@ -52,7 +52,7 @@ public interface ProcessPersistService { /** * Clean process. - * + * * @param instanceId instance ID * @param processId process ID */ From 221a3ad15dafa486f342fb0c6c60f05bc0172e2c Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:40:11 +0800 Subject: [PATCH 7/9] Refactor ProcessListChangedSubscriber --- .../infra/executor/sql/process/ProcessRegistry.java | 1 + 1 file changed, 1 insertion(+) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java index b28660f0bf7e2..1198c4245deda 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java @@ -106,6 +106,7 @@ public Collection listAll() { /** * Kill process. * + * @param processId process ID * @throws SQLException SQL exception */ public void kill(final String processId) throws SQLException { From a4407058a858e1464e180c3d3563282ee20199dc Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:49:12 +0800 Subject: [PATCH 8/9] Refactor ProcessListChangedSubscriber --- .../mode/persist/service/divided/ProcessPersistService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java index 3a47a8f1325be..894424fc4b9e8 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java @@ -33,7 +33,7 @@ public interface ProcessPersistService { * @param instanceId instance ID * @param taskId task ID */ - void reportLocalProcesses(final String instanceId, final String taskId); + void reportLocalProcesses(String instanceId, String taskId); /** * Get process list. From 3547cc37009432a20afc57c81e89dc17bfc563a8 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:56:58 +0800 Subject: [PATCH 9/9] Refactor ProcessListChangedSubscriber --- .../persist/service/ClusterProcessPersistServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java index 6d769be918691..5a888875f8d56 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java @@ -38,10 +38,10 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify;