diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java similarity index 97% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java index 2fdffdaaa0650..7d812cc1c212a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java @@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap; /** - * Pipeline job center. + * Pipeline job registry. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class PipelineJobCenter { +public final class PipelineJobRegistry { private static final Map JOBS = new ConcurrentHashMap<>(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index 64268bc27d98a..2e78a141101b3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; @@ -121,7 +121,7 @@ private static synchronized void persist(final String jobId, final int shardingI && !persistContext.getHasNewEvents().get()) { return; } - Optional jobItemContext = PipelineJobCenter.getItemContext(jobId, shardingItem); + Optional jobItemContext = PipelineJobRegistry.getItemContext(jobId, shardingItem); if (!jobItemContext.isPresent()) { return; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java index d7d5882c6cf75..f128bb4d6b38c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; @@ -47,8 +47,8 @@ public void process(final Type eventType, final JobConfiguration jobConfig) { } String jobId = jobConfig.getJobName(); if (disabled || deleted) { - Collection jobItems = PipelineJobCenter.getShardingItems(jobId); - PipelineJobCenter.stop(jobId); + Collection jobItems = PipelineJobRegistry.getShardingItems(jobId); + PipelineJobRegistry.stop(jobId); if (disabled) { onDisabled(jobConfig, jobItems); } @@ -57,7 +57,7 @@ public void process(final Type eventType, final JobConfiguration jobConfig) { switch (eventType) { case ADDED: case UPDATED: - if (PipelineJobCenter.isExisting(jobId)) { + if (PipelineJobRegistry.isExisting(jobId)) { log.info("{} added to executing jobs failed since it already exists", jobId); } else { executeJob(jobConfig); @@ -81,7 +81,7 @@ protected void onDisabled(final JobConfiguration jobConfig, final Collection result = PipelineJobCenter.getItemContext("Job1", 1); - Optional optionalPipelineJobItemContext = Optional.ofNullable(pipelineJobItemContext); - assertTrue(result.isPresent()); - assertEquals(Optional.empty(), PipelineJobCenter.getItemContext("Job2", 1)); - assertEquals(optionalPipelineJobItemContext, result); - PipelineJobCenter.stop("Job1"); - } - - @Test - void assertGetShardingItems() { - PipelineJob pipelineJob = mock(PipelineJob.class); - PipelineJobCenter.add("Job1", pipelineJob); - when(pipelineJob.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3)); - Collection shardingItems = pipelineJob.getShardingItems(); - Assertions.assertFalse(shardingItems.isEmpty()); - Assertions.assertEquals(Arrays.asList(1, 2, 3), PipelineJobCenter.getShardingItems("Job1")); - assertEquals(Collections.EMPTY_LIST, PipelineJobCenter.getShardingItems("Job2")); - PipelineJobCenter.stop("Job1"); - } -} diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java new file mode 100644 index 0000000000000..adbca271a205d --- /dev/null +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job; + +import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PipelineJobRegistryTest { + + @Mock + private PipelineJob job; + + @BeforeEach + void setUp() { + PipelineJobRegistry.add("foo_job", job); + } + + @AfterEach + void reset() { + PipelineJobRegistry.stop("foo_job"); + } + + @Test + void assertAdd() { + assertFalse(PipelineJobRegistry.isExisting("bar_job")); + PipelineJobRegistry.add("bar_job", mock(PipelineJob.class)); + assertTrue(PipelineJobRegistry.isExisting("bar_job")); + } + + @Test + void assertIsExisting() { + assertTrue(PipelineJobRegistry.isExisting("foo_job")); + } + + @Test + void assertGet() { + assertThat(PipelineJobRegistry.get("foo_job"), is(job)); + } + + @Test + void assertStop() { + PipelineJobRegistry.stop("foo_job"); + verify(job).stop(); + assertFalse(PipelineJobRegistry.isExisting("foo_job")); + } + + @Test + void assertGetExistedItemContext() { + PipelineJobItemContext jobItemContext = mock(PipelineJobItemContext.class); + PipelineTasksRunner tasksRunner = mock(PipelineTasksRunner.class); + when(tasksRunner.getJobItemContext()).thenReturn(jobItemContext); + when(job.getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner)); + Optional actual = PipelineJobRegistry.getItemContext("foo_job", 1); + assertTrue(actual.isPresent()); + assertThat(actual.get(), is(jobItemContext)); + } + + @Test + void assertGetNotExistedItemContext() { + assertThat(PipelineJobRegistry.getItemContext("bar_job", 1), is(Optional.empty())); + } + + @Test + void assertGetExistedShardingItems() { + when(job.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3)); + assertThat(PipelineJobRegistry.getShardingItems("foo_job"), is(Arrays.asList(1, 2, 3))); + } + + @Test + void assertGetNotExistedShardingItems() { + assertThat(PipelineJobRegistry.getShardingItems("bar_job"), is(Collections.emptyList())); + } +} diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 1d24377347c4d..f3ebcbadaf1c5 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -55,7 +55,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; @@ -187,7 +187,7 @@ private void prepare(final Collection jobItemContexts) { private void processFailed(final String jobId, final int shardingItem, final Exception ex) { log.error("job execution failed, {}-{}", jobId, shardingItem, ex); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex); - PipelineJobCenter.stop(jobId); + PipelineJobRegistry.stop(jobId); jobAPI.disable(jobId); } @@ -268,7 +268,7 @@ public void onFailure(final Throwable throwable) { CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink(); cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage())); } - PipelineJobCenter.stop(jobId); + PipelineJobRegistry.stop(jobId); jobAPI.disable(jobId); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 4f26e82eca2b7..eec99b71fbcd8 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -48,7 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; @@ -213,7 +213,7 @@ private static TransmissionJobItemProgress getTransmissionJobItemProgress(final */ public void start(final String jobId, final PipelineSink sink) { CDCJob job = new CDCJob(jobId, sink); - PipelineJobCenter.add(jobId, job); + PipelineJobRegistry.add(jobId, job); enable(jobId); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 98d430ec3fdcd..59fe991d7cecf 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -41,7 +41,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; @@ -92,7 +92,7 @@ private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBool jobItemManager.persistProgress(jobItemContext); } if (jobItemContext.isStopping()) { - PipelineJobCenter.stop(jobItemContext.getJobId()); + PipelineJobRegistry.stop(jobItemContext.getJobId()); return; } initIncrementalPosition(jobItemContext); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index dc372106477fc..e9f44259205cc 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -47,7 +47,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; @@ -134,8 +134,8 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); - if (PipelineJobCenter.isExisting(jobId)) { - PipelineJobCenter.stop(jobId); + if (PipelineJobRegistry.isExisting(jobId)) { + PipelineJobRegistry.stop(jobId); } ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName()); jobAPI.start(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames())); @@ -153,13 +153,13 @@ public void stopStreaming(final String jobId, final ChannelId channelId) { log.warn("job id is null or empty, ignored"); return; } - CDCJob job = (CDCJob) PipelineJobCenter.get(jobId); + CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId); if (null == job) { return; } if (job.getSink().identifierMatched(channelId)) { log.info("close CDC job, channel id: {}", channelId); - PipelineJobCenter.stop(jobId); + PipelineJobRegistry.stop(jobId); jobAPI.disable(jobId); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index 39b2a1041dcca..6dc4aaf45839d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -43,7 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; @@ -97,12 +97,12 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { - PipelineJobCenter.stop(jobItemContext.getJobId()); + PipelineJobRegistry.stop(jobItemContext.getJobId()); return; } prepareAndCheckTargetWithLock(jobItemContext); if (jobItemContext.isStopping()) { - PipelineJobCenter.stop(jobItemContext.getJobId()); + PipelineJobRegistry.stop(jobItemContext.getJobId()); return; } boolean isIncrementalSupported = PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()); @@ -113,7 +113,7 @@ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLExce if (isIncrementalSupported) { initIncrementalTasks(jobItemContext); if (jobItemContext.isStopping()) { - PipelineJobCenter.stop(jobItemContext.getJobId()); + PipelineJobRegistry.stop(jobItemContext.getJobId()); return; } }