From 78fc63189529f08372b5a66a8189c45c74a9f64f Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Thu, 23 Nov 2023 23:56:07 +0800 Subject: [PATCH] Remove PipelineJobIteErrorMessageManager (#29146) --- ...bItemErrorMessageGovernanceRepository.java | 21 ++++-- .../core/job/AbstractSimplePipelineJob.java | 6 +- .../InventoryIncrementalJobManager.java | 2 +- .../PipelineJobIteErrorMessageManager.java | 71 ------------------- .../InventoryIncrementalTasksRunner.java | 6 +- .../data/pipeline/cdc/core/job/CDCJob.java | 9 +-- .../api/impl/ConsistencyCheckJobAPI.java | 3 +- .../task/ConsistencyCheckTasksRunner.java | 3 +- 8 files changed, 31 insertions(+), 90 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java index 213302fa88fee..a2c20eca00749 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java @@ -18,9 +18,12 @@ package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import java.util.Optional; + /** * Pipeline job item error message governance repository. */ @@ -34,10 +37,20 @@ public final class PipelineJobItemErrorMessageGovernanceRepository { * * @param jobId job ID * @param shardingItem sharding item - * @param errorMessage error message + * @param throwable throwable + */ + public void update(final String jobId, final int shardingItem, final Throwable throwable) { + repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), ExceptionUtils.getStackTrace(throwable)); + } + + /** + * Clean job item error message. + * + * @param jobId job ID + * @param shardingItem sharding item */ - public void update(final String jobId, final int shardingItem, final String errorMessage) { - repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), errorMessage); + public void clean(final String jobId, final int shardingItem) { + repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), ""); } /** @@ -48,6 +61,6 @@ public void update(final String jobId, final int shardingItem, final String erro * @return error msg */ public String load(final String jobId, final int shardingItem) { - return repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem)); + return Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem))).orElse(""); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java index 5931c3b681ef5..fa3e33aab7532 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.elasticjob.api.ShardingContext; @@ -74,7 +74,7 @@ private void execute0(final PipelineJobItemContext jobItemContext) { if (!addTasksRunner(shardingItem, tasksRunner)) { return; } - new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage(); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem); prepare(jobItemContext); log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem); tasksRunner.start(); @@ -82,7 +82,7 @@ private void execute0(final PipelineJobItemContext jobItemContext) { private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) { log.error("job execution failed, {}-{}", jobId, shardingItem, ex); - new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index ee9f2ef5bc315..afa4fa3e78a7d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -84,7 +84,7 @@ public List getJobItemInfos(final String jobId) for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); - String errorMessage = new PipelineJobIteErrorMessageManager(jobId, shardingItem).getErrorMessage(); + String errorMessage = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId, shardingItem); if (null == jobItemProgress) { result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage)); continue; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java deleted file mode 100644 index 1066c68d9f3b5..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.job.service; - -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; - -import java.util.Optional; - -/** - * Pipeline job item error message manager. - */ -public final class PipelineJobIteErrorMessageManager { - - private final String jobId; - - private final int shardingItem; - - private final PipelineGovernanceFacade governanceFacade; - - public PipelineJobIteErrorMessageManager(final String jobId, final int shardingItem) { - this.jobId = jobId; - this.shardingItem = shardingItem; - governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)); - } - - /** - * Get job item error message. - * - * @return map, key is sharding item, value is error message - */ - public String getErrorMessage() { - return Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId, shardingItem)).orElse(""); - } - - /** - * Update job item error message. - * - * @param error error - */ - public void updateErrorMessage(final Object error) { - governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, null == error ? "" : buildErrorMessage(error)); - } - - private String buildErrorMessage(final Object error) { - return error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString(); - } - - /** - * Clean job item error message. - */ - public void cleanErrorMessage() { - governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ""); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java index 13b9b2477afd5..be8da1323fe27 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java @@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -153,7 +153,7 @@ protected void inventorySuccessCallback() { protected void inventoryFailureCallback(final Throwable throwable) { log.error("onFailure, inventory task execute failed.", throwable); String jobId = jobItemContext.getJobId(); - new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { @@ -188,7 +188,7 @@ public void onSuccess() { public void onFailure(final Throwable throwable) { log.error("onFailure, incremental task execute failed.", throwable); String jobId = jobItemContext.getJobId(); - new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable); try { jobManager.stop(jobId); } catch (final PipelineJobNotFoundException ignored) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 70ead420a66d4..62250972e4b20 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -41,7 +41,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -94,7 +95,7 @@ public void execute(final ShardingContext shardingContext) { continue; } jobItemContexts.add(jobItemContext); - new PipelineJobIteErrorMessageManager(jobId, shardingItem).cleanErrorMessage(); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem); log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem); } if (jobItemContexts.isEmpty()) { @@ -128,7 +129,7 @@ private void prepare(final Collection jobItemContexts) { private void processFailed(final String jobId, final int shardingItem, final Exception ex) { log.error("job execution failed, {}-{}", jobId, shardingItem, ex); - new PipelineJobIteErrorMessageManager(jobId, shardingItem).updateErrorMessage(ex); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex); PipelineJobCenter.stop(jobId); jobAPI.updateJobConfigurationDisabled(jobId, true); } @@ -205,7 +206,7 @@ public void onSuccess() { public void onFailure(final Throwable throwable) { log.error("onFailure, {} task execute failed.", identifier, throwable); String jobId = jobItemContext.getJobId(); - new PipelineJobIteErrorMessageManager(jobId, jobItemContext.getShardingItem()).updateErrorMessage(throwable); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable); if (jobItemContext.getSink() instanceof CDCSocketSink) { CDCSocketSink cdcSink = (CDCSocketSink) jobItemContext.getSink(); cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", throwable.getMessage())); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index a9a846bed49bf..367b7e19b1774 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -33,7 +33,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; @@ -232,7 +231,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO); result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); fillInJobItemInfoWithCheckAlgorithm(result, checkJobId); - result.setErrorMessage(new PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage()); + result.setErrorMessage(PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().load(checkJobId, 0)); Map checkJobResults = governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, checkJobId); result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched)); result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index f705d6b01abc1..27a54b2e22b02 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -33,7 +33,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -153,7 +152,7 @@ public void onFailure(final Throwable throwable) { return; } log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable); - new PipelineJobIteErrorMessageManager(checkJobId, 0).updateErrorMessage(throwable); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().update(checkJobId, 0, throwable); jobManager.stop(checkJobId); } }