Skip to content

Commit

Permalink
Add PipelineJobItemManager (#29078)
Browse files Browse the repository at this point in the history
* Add PipelineJobItemManager

* Add PipelineJobItemManager
  • Loading branch information
terrymanu authored Nov 18, 2023
1 parent b76937e commit df44c99
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -48,6 +49,7 @@ protected AbstractSimplePipelineJob(final String jobId) {
@Override
public void execute(final ShardingContext shardingContext) {
PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
PipelineJobItemManager<?> jobItemManager = new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
Expand All @@ -57,31 +59,31 @@ public void execute(final ShardingContext shardingContext) {
}
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobManager, jobItemContext);
execute0(jobItemManager, jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobManager, jobId, shardingItem, ex);
processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobManager jobManager, final PipelineJobItemContext jobItemContext) {
private void execute0(final PipelineJobItemManager<?> jobItemManager, final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
jobItemManager.cleanErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) {
private void processFailed(final PipelineJobManager jobManager, final PipelineJobItemManager<?> jobItemManager, final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

Expand Down Expand Up @@ -130,7 +130,8 @@ private static synchronized void persist(final String jobId, final int shardingI
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get());
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType())
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.util.Optional;

/**
* Pipeline job manager.
*
* @param <T> type of pipeline job item progress
*/
public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {

private final YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> swapper;

@SuppressWarnings({"rawtypes", "unchecked"})
public PipelineJobItemManager(final YamlPipelineJobItemProgressSwapper swapper) {
this.swapper = swapper;
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
public void updateStatus(final String jobId, final int shardingItem, final JobStatus status) {
Optional<T> jobItemProgress = getProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(
PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Get job item progress.
*
* @param jobId job id
* @param shardingItem sharding item
* @return job item progress
*/
public Optional<T> getProgress(final String jobId, final int shardingItem) {
return PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem)
.map(optional -> swapper.swapToObject(YamlEngine.unmarshal(optional, swapper.getYamlProgressClass(), true)));
}

/**
* Persist job item progress.
*
* @param jobItemContext job item context
*/
public void persistProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}

/**
* Update job item progress.
*
* @param jobItemContext job item context
*/
public void updateProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
}

@SuppressWarnings("unchecked")
private String convertProgressYamlContent(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(swapper.swapToYamlConfiguration((T) jobItemContext.toProgress()));
}

/**
* Get job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage(final String jobId, final int shardingItem) {
return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}

/**
* Update job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @param error error
*/
public void updateErrorMessage(final String jobId, final int shardingItem, final Object error) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
String value = "";
if (null != error) {
value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key, value);
}

/**
* Clean job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
*/
public void cleanErrorMessage(final String jobId, final int shardingItem) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
Expand All @@ -32,8 +30,6 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -95,7 +91,7 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
*/
public void startDisabledJob(final String jobId) {
if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
Optional<? extends PipelineJobItemProgress> jobItemProgress = getJobItemProgress(jobId, 0);
Optional<? extends PipelineJobItemProgress> jobItemProgress = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()).getProgress(jobId, 0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
Expand Down Expand Up @@ -197,98 +193,4 @@ public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
}
return Collections.emptyList();
}

/**
* Get job item progress.
*
* @param jobId job id
* @param shardingItem sharding item
* @param <T> type of pipeline job item progress
* @return job item progress, may be null
*/
public <T extends PipelineJobItemProgress> Optional<T> getJobItemProgress(final String jobId, final int shardingItem) {
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> swapper = jobAPI.getYamlJobItemProgressSwapper();
Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem);
return progress.map(optional -> swapper.swapToObject(YamlEngine.unmarshal(optional, swapper.getYamlProgressClass(), true)));
}

/**
* Persist job item progress.
*
* @param jobItemContext job item context
*/
public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
Optional<PipelineJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Update job item progress.
*
* @param jobItemContext job item context
*/
public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
.updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
}

/**
* Get job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @return map, key is sharding item, value is error message
*/
public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}

/**
* Update job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @param error error
*/
public void updateJobItemErrorMessage(final String jobId, final int shardingItem, final Object error) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
String value = "";
if (null != error) {
value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key, value);
}

/**
* Clean job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
*/
public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, "");
}
}
Loading

0 comments on commit df44c99

Please sign in to comment.