From 99278110ab91b76e8cda902ecc018eece2a04023 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 21 Nov 2023 16:44:20 +0800 Subject: [PATCH] Add PipelineJobIteErrorMessageManager (#29102) --- .../core/job/AbstractSimplePipelineJob.java | 15 ++-- .../InventoryIncrementalJobManager.java | 5 +- .../PipelineJobIteErrorMessageManager.java | 72 +++++++++++++++++++ .../job/service/PipelineJobItemManager.java | 42 +---------- .../InventoryIncrementalTasksRunner.java | 5 +- .../data/pipeline/cdc/core/job/CDCJob.java | 7 +- .../api/impl/ConsistencyCheckJobAPI.java | 3 +- .../task/ConsistencyCheckTasksRunner.java | 3 +- 8 files changed, 93 insertions(+), 59 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java index 36899a266c824..5931c3b681ef5 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.elasticjob.api.ShardingContext; @@ -49,7 +49,6 @@ protected AbstractSimplePipelineJob(final String jobId) { @Override public void execute(final ShardingContext shardingContext) { PipelineJobManager jobManager = new PipelineJobManager(getJobAPI()); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper()); String jobId = shardingContext.getJobName(); int shardingItem = shardingContext.getShardingItem(); log.info("Execute job {}-{}", jobId, shardingItem); @@ -59,31 +58,31 @@ public void execute(final ShardingContext shardingContext) { } try { PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext); - execute0(jobItemManager, jobItemContext); + execute0(jobItemContext); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON - processFailed(jobManager, jobItemManager, jobId, shardingItem, ex); + processFailed(jobManager, jobId, shardingItem, ex); throw ex; } } - private void execute0(final PipelineJobItemManager jobItemManager, final PipelineJobItemContext jobItemContext) { + private void execute0(final PipelineJobItemContext jobItemContext) { String jobId = jobItemContext.getJobId(); int shardingItem = jobItemContext.getShardingItem(); PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext); if (!addTasksRunner(shardingItem, tasksRunner)) { return; } - jobItemManager.cleanErrorMessage(jobId, shardingItem); + new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage(); prepare(jobItemContext); log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem); tasksRunner.start(); } - private void processFailed(final PipelineJobManager jobManager, final PipelineJobItemManager jobItemManager, final String jobId, final int shardingItem, final Exception ex) { + private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) { log.error("job execution failed, {}-{}", jobId, shardingItem, ex); - jobItemManager.updateErrorMessage(jobId, shardingItem, ex); + new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index 4e754a5d45325..7e41b403aaca2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -80,12 +80,11 @@ public List getJobItemInfos(final String jobId) long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); - TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId); InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); - String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem); + String errorMessage = new PipelineJobIteErrorMessageManager(jobId, shardingItem).getErrorMessage(); if (null == jobItemProgress) { result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage)); continue; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java new file mode 100644 index 0000000000000..a20cd2e7a4fc4 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.service; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; +import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; + +import java.util.Optional; + +/** + * Pipeline job item error message manager. + */ +public final class PipelineJobIteErrorMessageManager { + + private final String jobId; + + private final int shardingItem; + + private final GovernanceRepositoryAPI governanceRepositoryAPI; + + public PipelineJobIteErrorMessageManager(final String jobId, final int shardingItem) { + this.jobId = jobId; + this.shardingItem = shardingItem; + governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)); + } + + /** + * Get job item error message. + * + * @return map, key is sharding item, value is error message + */ + public String getErrorMessage() { + return Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId, shardingItem)).orElse(""); + } + + /** + * Update job item error message. + * + * @param error error + */ + public void updateErrorMessage(final Object error) { + governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error)); + } + + private String buildErrorMessage(final Object error) { + return error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString(); + } + + /** + * Clean job item error message. + */ + public void cleanErrorMessage() { + governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), ""); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java index 9aa4a01cbb38e..c6e6ec47e5a57 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java @@ -17,11 +17,9 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; @@ -30,7 +28,7 @@ import java.util.Optional; /** - * Pipeline job manager. + * Pipeline job item manager. * * @param type of pipeline job item progress */ @@ -96,42 +94,4 @@ public void updateProgress(final PipelineJobItemContext jobItemContext) { private String convertProgressYamlContent(final PipelineJobItemContext jobItemContext) { return YamlEngine.marshal(swapper.swapToYamlConfiguration((T) jobItemContext.toProgress())); } - - /** - * Get job item error message. - * - * @param jobId job id - * @param shardingItem sharding item - * @return map, key is sharding item, value is error message - */ - public String getErrorMessage(final String jobId, final int shardingItem) { - return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse(""); - } - - /** - * Update job item error message. - * - * @param jobId job id - * @param shardingItem sharding item - * @param error error - */ - public void updateErrorMessage(final String jobId, final int shardingItem, final Object error) { - String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem); - String value = ""; - if (null != error) { - value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString(); - } - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key, value); - } - - /** - * Clean job item error message. - * - * @param jobId job id - * @param shardingItem sharding item - */ - public void cleanErrorMessage(final String jobId, final int shardingItem) { - String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, ""); - } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java index 3935bec810603..13b9b2477afd5 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java @@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -152,7 +153,7 @@ protected void inventorySuccessCallback() { protected void inventoryFailureCallback(final Throwable throwable) { log.error("onFailure, inventory task execute failed.", throwable); String jobId = jobItemContext.getJobId(); - jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable); + new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { @@ -187,7 +188,7 @@ public void onSuccess() { public void onFailure(final Throwable throwable) { log.error("onFailure, incremental task execute failed.", throwable); String jobId = jobItemContext.getJobId(); - jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable); + new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 0f6b66fa7c1d0..70ead420a66d4 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -41,6 +41,7 @@ import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -93,7 +94,7 @@ public void execute(final ShardingContext shardingContext) { continue; } jobItemContexts.add(jobItemContext); - jobItemManager.cleanErrorMessage(jobId, shardingItem); + new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage(); log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem); } if (jobItemContexts.isEmpty()) { @@ -127,7 +128,7 @@ private void prepare(final Collection jobItemContexts) { private void processFailed(final String jobId, final int shardingItem, final Exception ex) { log.error("job execution failed, {}-{}", jobId, shardingItem, ex); - jobItemManager.updateErrorMessage(jobId, shardingItem, ex); + new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex); PipelineJobCenter.stop(jobId); jobAPI.updateJobConfigurationDisabled(jobId, true); } @@ -204,7 +205,7 @@ public void onSuccess() { public void onFailure(final Throwable throwable) { log.error("onFailure, {} task execute failed.", identifier, throwable); String jobId = jobItemContext.getJobId(); - jobItemManager.updateErrorMessage(jobId, jobItemContext.getShardingItem(), throwable); + new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); if (jobItemContext.getSink() instanceof CDCSocketSink) { CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink(); cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage())); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index cf166153d5422..f9c1db64a6557 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; @@ -231,7 +232,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO); result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); fillInJobItemInfoWithCheckAlgorithm(result, checkJobId); - result.setErrorMessage(new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId, 0)); + result.setErrorMessage(new PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage()); Map checkJobResults = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched)); result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 136e92b3dc5bf..e75f71f126d0f 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -151,7 +152,7 @@ public void onFailure(final Throwable throwable) { return; } log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable); - jobItemManager.updateErrorMessage(checkJobId, 0, throwable); + new PipelineJobIteErrorMessageManager(checkJobId, 0).updateErrorMessage(throwable); jobManager.stop(checkJobId); } }