Skip to content

Commit

Permalink
Add PipelineJobIteErrorMessageManager (#29102)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 21, 2023
1 parent 301cee3 commit 9927811
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -49,7 +49,6 @@ protected AbstractSimplePipelineJob(final String jobId) {
@Override
public void execute(final ShardingContext shardingContext) {
PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
PipelineJobItemManager<?> jobItemManager = new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
Expand All @@ -59,31 +58,31 @@ public void execute(final ShardingContext shardingContext) {
}
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemManager, jobItemContext);
execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
processFailed(jobManager, jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobItemManager<?> jobItemManager, final PipelineJobItemContext jobItemContext) {
private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
jobItemManager.cleanErrorMessage(jobId, shardingItem);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage();
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final PipelineJobManager jobManager, final PipelineJobItemManager<?> jobItemManager, final String jobId, final int shardingItem, final Exception ex) {
private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId)
long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobItemManager<InventoryIncrementalJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem);
String errorMessage = new PipelineJobIteErrorMessageManager(jobId, shardingItem).getErrorMessage();
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;

import java.util.Optional;

/**
* Pipeline job item error message manager.
*/
public final class PipelineJobIteErrorMessageManager {

private final String jobId;

private final int shardingItem;

private final GovernanceRepositoryAPI governanceRepositoryAPI;

public PipelineJobIteErrorMessageManager(final String jobId, final int shardingItem) {
this.jobId = jobId;
this.shardingItem = shardingItem;
governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
}

/**
* Get job item error message.
*
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage() {
return Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}

/**
* Update job item error message.
*
* @param error error
*/
public void updateErrorMessage(final Object error) {
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error));
}

private String buildErrorMessage(final Object error) {
return error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}

/**
* Clean job item error message.
*/
public void cleanErrorMessage() {
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
Expand All @@ -30,7 +28,7 @@
import java.util.Optional;

/**
* Pipeline job manager.
* Pipeline job item manager.
*
* @param <T> type of pipeline job item progress
*/
Expand Down Expand Up @@ -96,42 +94,4 @@ public void updateProgress(final PipelineJobItemContext jobItemContext) {
private String convertProgressYamlContent(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(swapper.swapToYamlConfiguration((T) jobItemContext.toProgress()));
}

/**
* Get job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @return map, key is sharding item, value is error message
*/
public String getErrorMessage(final String jobId, final int shardingItem) {
return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
}

/**
* Update job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @param error error
*/
public void updateErrorMessage(final String jobId, final int shardingItem, final Object error) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
String value = "";
if (null != error) {
value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key, value);
}

/**
* Clean job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
*/
public void cleanErrorMessage(final String jobId, final int shardingItem) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
Expand Down Expand Up @@ -152,7 +153,7 @@ protected void inventorySuccessCallback() {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down Expand Up @@ -187,7 +188,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void execute(final ShardingContext shardingContext) {
continue;
}
jobItemContexts.add(jobItemContext);
jobItemManager.cleanErrorMessage(jobId, shardingItem);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage();
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
}
if (jobItemContexts.isEmpty()) {
Expand Down Expand Up @@ -127,7 +128,7 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down Expand Up @@ -204,7 +205,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
Expand Down Expand Up @@ -231,7 +232,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId, 0));
result.setErrorMessage(new PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
Map<String, TableDataConsistencyCheckResult> checkJobResults = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand Down Expand Up @@ -151,7 +152,7 @@ public void onFailure(final Throwable throwable) {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable);
jobItemManager.updateErrorMessage(checkJobId, 0, throwable);
new PipelineJobIteErrorMessageManager(checkJobId, 0).updateErrorMessage(throwable);
jobManager.stop(checkJobId);
}
}
Expand Down

0 comments on commit 9927811

Please sign in to comment.