Skip to content

Commit

Permalink
Remove PipelineJobIteErrorMessageManager (#29146)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 23, 2023
1 parent 56ebfb1 commit 78fc631
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;

import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Optional;

/**
* Pipeline job item error message governance repository.
*/
Expand All @@ -34,10 +37,20 @@ public final class PipelineJobItemErrorMessageGovernanceRepository {
*
* @param jobId job ID
* @param shardingItem sharding item
* @param errorMessage error message
* @param throwable throwable
*/
public void update(final String jobId, final int shardingItem, final Throwable throwable) {
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), ExceptionUtils.getStackTrace(throwable));
}

/**
* Clean job item error message.
*
* @param jobId job ID
* @param shardingItem sharding item
*/
public void update(final String jobId, final int shardingItem, final String errorMessage) {
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), errorMessage);
public void clean(final String jobId, final int shardingItem) {
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
}

/**
Expand All @@ -48,6 +61,6 @@ public void update(final String jobId, final int shardingItem, final String erro
* @return error msg
*/
public String load(final String jobId, final int shardingItem) {
return repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
return Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem))).orElse("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -74,15 +74,15 @@ private void execute0(final PipelineJobItemContext jobItemContext) {
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = new PipelineJobIteErrorMessageManager(jobId, shardingItem).getErrorMessage();
String errorMessage = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId, shardingItem);
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage));
continue;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
Expand Down Expand Up @@ -153,7 +153,7 @@ protected void inventorySuccessCallback() {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down Expand Up @@ -188,7 +188,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void execute(final ShardingContext shardingContext) {
continue;
}
jobItemContexts.add(jobItemContext);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
}
if (jobItemContexts.isEmpty()) {
Expand Down Expand Up @@ -128,7 +129,7 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down Expand Up @@ -205,7 +206,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
Expand Down Expand Up @@ -232,7 +231,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
result.setErrorMessage(PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().load(checkJobId, 0));
Map<String, TableDataConsistencyCheckResult> checkJobResults = governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand Down Expand Up @@ -153,7 +152,7 @@ public void onFailure(final Throwable throwable) {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable);
new PipelineJobIteErrorMessageManager(checkJobId, 0).updateErrorMessage(throwable);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().update(checkJobId, 0, throwable);
jobManager.stop(checkJobId);
}
}
Expand Down

0 comments on commit 78fc631

Please sign in to comment.