Skip to content

Commit

Permalink
Refactor ProcessListChangedSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 29, 2024
1 parent 07ef6d0 commit 9a266a5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand All @@ -37,12 +36,9 @@
import org.mockito.quality.Strictness;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -72,19 +68,10 @@ void assertReportLocalProcessesWithNotCurrentInstance() {
}

@Test
void assertReportEmptyLocalProcesses() {
void assertReportLocalProcesses() {
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
subscriber.reportLocalProcesses(new ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
verify(contextManager.getPersistServiceFacade().getRepository(), times(0)).persist(any(), any());
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
}

@Test
void assertReportNotEmptyLocalProcesses() {
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class, RETURNS_DEEP_STUBS)));
subscriber.reportLocalProcesses(new ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"), any());
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).reportLocalProcesses("foo_instance_id", "foo_task_id");
}

@Test
Expand All @@ -94,28 +81,15 @@ void assertCompleteToReportLocalProcesses() {
}

@Test
void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
subscriber.killLocalProcess(new KillLocalProcessEvent("bar_instance_id", "foo_pid"));
verify(contextManager.getPersistServiceFacade().getRepository(), times(0)).delete(any());
}

@Test
void assertKillLocalProcessWithoutExistedProcess() throws SQLException {
when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null);
void assertKillLocalProcessWithCurrentInstance() throws SQLException {
subscriber.killLocalProcess(new KillLocalProcessEvent("foo_instance_id", "foo_pid"));
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
verify(contextManager.getPersistServiceFacade().getProcessPersistService()).cleanProcess("foo_instance_id", "foo_pid");
}

@Test
void assertKillLocalProcessWithExistedProcess() throws SQLException {
Process process = mock(Process.class, RETURNS_DEEP_STUBS);
Statement statement = mock(Statement.class);
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, statement));
when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process);
subscriber.killLocalProcess(new KillLocalProcessEvent("foo_instance_id", "foo_pid"));
verify(process).setInterrupted(true);
verify(statement).cancel();
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
subscriber.killLocalProcess(new KillLocalProcessEvent("bar_instance_id", "foo_pid"));
verify(contextManager.getPersistServiceFacade().getProcessPersistService(), times(0)).cleanProcess("bar_instance_id", "foo_pid");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.persist.service;

import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
Expand All @@ -37,15 +38,17 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(AutoMockExtension.class)
@StaticMockSettings(ProcessOperationLockRegistry.class)
@StaticMockSettings({ProcessRegistry.class, ProcessOperationLockRegistry.class})
class ClusterProcessPersistServiceTest {

@Mock
Expand All @@ -58,6 +61,22 @@ void setUp() {
processPersistService = new ClusterProcessPersistService(repository);
}

@Test
void assertReportEmptyLocalProcesses() {
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
processPersistService.reportLocalProcesses("foo_instance_id", "foo_task_id");
verify(repository, times(0)).persist(any(), any());
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
}

@Test
void assertReportNotEmptyLocalProcesses() {
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class, RETURNS_DEEP_STUBS)));
processPersistService.reportLocalProcesses("foo_instance_id", "foo_task_id");
verify(repository).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"), any());
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
}

@Test
void assertGetCompletedProcessList() {
when(ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(any(), any())).thenReturn(true);
Expand Down Expand Up @@ -111,4 +130,10 @@ private void assertKillProcess() {
processPersistService.killProcess("foo_process_id");
verify(repository).persist("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id", "");
}

@Test
void assertCleanProcess() {
processPersistService.cleanProcess("foo_instance_id", "foo_pid");
verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
}
}

0 comments on commit 9a266a5

Please sign in to comment.