Skip to content

Commit

Permalink
Refactor PipelineJobCenter (#29331)
Browse files Browse the repository at this point in the history
* Refactor PipelineJobCenter

* Refactor PipelineJobCenter

* Refactor PipelineJobCenter

* Refactor PipelineJobCenter

* Refactor PipelineJobCenter
  • Loading branch information
terrymanu authored Dec 8, 2023
1 parent 46c39dd commit 61ee704
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;

Expand All @@ -33,79 +32,72 @@
* Pipeline job center.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class PipelineJobCenter {

private static final Map<String, PipelineJob> JOB_MAP = new ConcurrentHashMap<>();
private static final Map<String, PipelineJob> JOBS = new ConcurrentHashMap<>();

/**
* Add job.
* Add pipeline job.
*
* @param jobId job id
* @param job job
* @param jobId pipeline job id
* @param job pipeline job
*/
public static void addJob(final String jobId, final PipelineJob job) {
JOB_MAP.put(jobId, job);
public static void add(final String jobId, final PipelineJob job) {
JOBS.put(jobId, job);
}

/**
* Is job existing.
* Judge whether pipeline job existing.
*
* @param jobId job id
* @return true when job exists, else false
* @param jobId pipeline job id
* @return pipeline job exists or not
*/
public static boolean isJobExisting(final String jobId) {
return JOB_MAP.containsKey(jobId);
public static boolean isExisting(final String jobId) {
return JOBS.containsKey(jobId);
}

/**
* Get job.
* Get pipeline job.
*
* @param jobId job id
* @return job
* @param jobId pipeline job id
* @return pipeline job
*/
public static PipelineJob getJob(final String jobId) {
return JOB_MAP.get(jobId);
public static PipelineJob get(final String jobId) {
return JOBS.get(jobId);
}

/**
* Stop job.
* Stop pipeline job.
*
* @param jobId job id
* @param jobId pipeline job id
*/
public static void stop(final String jobId) {
PipelineJob job = JOB_MAP.get(jobId);
PipelineJob job = JOBS.get(jobId);
if (null == job) {
return;
}
job.stop();
JOB_MAP.remove(jobId);
JOBS.remove(jobId);
}

/**
* Get job item context.
* Get pipeline job item context.
*
* @param jobId job id
* @param jobId pipeline job id
* @param shardingItem sharding item
* @return job item context
* @return pipeline job item context
*/
public static Optional<PipelineJobItemContext> getJobItemContext(final String jobId, final int shardingItem) {
PipelineJob job = JOB_MAP.get(jobId);
if (null == job) {
return Optional.empty();
}
Optional<PipelineTasksRunner> tasksRunner = job.getTasksRunner(shardingItem);
return tasksRunner.map(PipelineTasksRunner::getJobItemContext);
public static Optional<PipelineJobItemContext> getItemContext(final String jobId, final int shardingItem) {
return JOBS.containsKey(jobId) ? JOBS.get(jobId).getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext) : Optional.empty();
}

/**
* Get sharding items.
*
* @param jobId job id
* @return sharding items.
* @param jobId pipeline job id
* @return sharding items
*/
public static Collection<Integer> getShardingItems(final String jobId) {
PipelineJob pipelineJob = JOB_MAP.get(jobId);
return null == pipelineJob ? Collections.emptyList() : pipelineJob.getShardingItems();
return JOBS.containsKey(jobId) ? JOBS.get(jobId).getShardingItems() : Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private static synchronized void persist(final String jobId, final int shardingI
&& !persistContext.getHasNewEvents().get()) {
return;
}
Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItem);
Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getItemContext(jobId, shardingItem);
if (!jobItemContext.isPresent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void process(final Type eventType, final JobConfiguration jobConfig) {
switch (eventType) {
case ADDED:
case UPDATED:
if (PipelineJobCenter.isJobExisting(jobId)) {
if (PipelineJobCenter.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
executeJob(jobConfig);
Expand All @@ -81,7 +81,7 @@ protected void onDisabled(final JobConfiguration jobConfig, final Collection<Int
protected void executeJob(final JobConfiguration jobConfig) {
String jobId = jobConfig.getJobName();
AbstractPipelineJob job = buildPipelineJob(jobId);
PipelineJobCenter.addJob(jobId, job);
PipelineJobCenter.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class PipelineJobCenterTest {
@Test
void assertPipelineJobCenter() {
PipelineJob pipelineJob = mock(PipelineJob.class);
PipelineJobCenter.addJob("Job1", pipelineJob);
assertTrue(PipelineJobCenter.isJobExisting("Job1"));
assertFalse(PipelineJobCenter.isJobExisting("Job2"));
assertNotNull(PipelineJobCenter.getJob("Job1"));
assertEquals(pipelineJob, PipelineJobCenter.getJob("Job1"));
assertNull(PipelineJobCenter.getJob("Job2"));
PipelineJobCenter.add("Job1", pipelineJob);
assertTrue(PipelineJobCenter.isExisting("Job1"));
assertFalse(PipelineJobCenter.isExisting("Job2"));
assertNotNull(PipelineJobCenter.get("Job1"));
assertEquals(pipelineJob, PipelineJobCenter.get("Job1"));
assertNull(PipelineJobCenter.get("Job2"));
PipelineJobCenter.stop("Job1");
}

Expand All @@ -57,19 +57,19 @@ void assertGetJobItemContext() {
PipelineJobItemContext pipelineJobItemContext = mock(PipelineJobItemContext.class);
when(pipelineJob.getTasksRunner(anyInt())).thenReturn(Optional.of(pipelineTasksRunner));
when(pipelineTasksRunner.getJobItemContext()).thenReturn(pipelineJobItemContext);
PipelineJobCenter.addJob("Job1", pipelineJob);
Optional<PipelineJobItemContext> result = PipelineJobCenter.getJobItemContext("Job1", 1);
PipelineJobCenter.add("Job1", pipelineJob);
Optional<PipelineJobItemContext> result = PipelineJobCenter.getItemContext("Job1", 1);
Optional<PipelineJobItemContext> optionalPipelineJobItemContext = Optional.ofNullable(pipelineJobItemContext);
assertTrue(result.isPresent());
assertEquals(Optional.empty(), PipelineJobCenter.getJobItemContext("Job2", 1));
assertEquals(Optional.empty(), PipelineJobCenter.getItemContext("Job2", 1));
assertEquals(optionalPipelineJobItemContext, result);
PipelineJobCenter.stop("Job1");
}

@Test
void assertGetShardingItems() {
PipelineJob pipelineJob = mock(PipelineJob.class);
PipelineJobCenter.addJob("Job1", pipelineJob);
PipelineJobCenter.add("Job1", pipelineJob);
when(pipelineJob.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3));
Collection<Integer> shardingItems = pipelineJob.getShardingItems();
Assertions.assertFalse(shardingItems.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private static TransmissionJobItemProgress getTransmissionJobItemProgress(final
*/
public void start(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(jobId, sink);
PipelineJobCenter.addJob(jobId, job);
PipelineJobCenter.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
if (PipelineJobCenter.isExisting(jobId)) {
PipelineJobCenter.stop(jobId);
}
ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
Expand All @@ -153,7 +153,7 @@ public void stopStreaming(final String jobId, final ChannelId channelId) {
log.warn("job id is null or empty, ignored");
return;
}
CDCJob job = (CDCJob) PipelineJobCenter.getJob(jobId);
CDCJob job = (CDCJob) PipelineJobCenter.get(jobId);
if (null == job) {
return;
}
Expand Down

0 comments on commit 61ee704

Please sign in to comment.