From 56ebfb136342f05230fce2c2f28fac9ddc946e00 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Thu, 23 Nov 2023 19:46:37 +0800 Subject: [PATCH] Refactor PipelineGovernanceFacade (#29142) --- .../repository/PipelineGovernanceFacade.java | 30 ++++--------- ...bItemErrorMessageGovernanceRepository.java | 2 +- .../item/PipelineJobItemFacade.java | 37 ++++++++++++++++ ...ineJobItemProcessGovernanceRepository.java | 2 +- .../PipelineJobCheckGovernanceRepository.java | 2 +- ...eJobConfigurationGovernanceRepository.java | 2 +- .../repository/job/PipelineJobFacade.java | 43 +++++++++++++++++++ .../PipelineJobGovernanceRepository.java | 2 +- ...PipelineJobOffsetGovernanceRepository.java | 2 +- ...etaDataDataSourceGovernanceRepository.java | 2 +- .../metadata/PipelineMetaDataFacade.java | 37 ++++++++++++++++ ...cessConfigurationGovernanceRepository.java | 2 +- .../PipelineJobIteErrorMessageManager.java | 6 +-- .../job/service/PipelineJobItemManager.java | 8 ++-- .../core/job/service/PipelineJobManager.java | 12 +++--- .../PipelineDataSourcePersistService.java | 4 +- ...ineProcessConfigurationPersistService.java | 4 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 ++-- .../api/impl/ConsistencyCheckJobAPI.java | 24 +++++------ .../task/ConsistencyCheckTasksRunner.java | 2 +- .../migration/api/impl/MigrationJobAPI.java | 2 +- .../prepare/MigrationJobPreparer.java | 4 +- .../service/PipelineGovernanceFacadeTest.java | 38 ++++++++-------- .../ConsistencyCheckJobTest.java | 2 +- .../api/impl/ConsistencyCheckJobAPITest.java | 8 ++-- .../api/impl/MigrationJobAPITest.java | 4 +- .../MigrationDataConsistencyCheckerTest.java | 2 +- 27 files changed, 198 insertions(+), 93 deletions(-) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => item}/PipelineJobItemErrorMessageGovernanceRepository.java (98%) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => item}/PipelineJobItemProcessGovernanceRepository.java (99%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => job}/PipelineJobCheckGovernanceRepository.java (99%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => job}/PipelineJobConfigurationGovernanceRepository.java (98%) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => job}/PipelineJobGovernanceRepository.java (98%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => job}/PipelineJobOffsetGovernanceRepository.java (99%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => metadata}/PipelineMetaDataDataSourceGovernanceRepository.java (98%) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/{ => metadata}/PipelineMetaDataProcessConfigurationGovernanceRepository.java (98%) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java index a2b33c35d6958..a96baaf20a200 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineGovernanceFacade.java @@ -20,6 +20,9 @@ import lombok.AccessLevel; import lombok.Getter; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath; +import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemFacade; +import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job.PipelineJobFacade; +import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata.PipelineMetaDataFacade; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; @@ -32,32 +35,17 @@ public final class PipelineGovernanceFacade { @Getter(AccessLevel.NONE) private final ClusterPersistRepository repository; - private final PipelineJobConfigurationGovernanceRepository jobConfigurationGovernanceRepository; + private final PipelineJobFacade jobFacade; - private final PipelineJobOffsetGovernanceRepository jobOffsetGovernanceRepository; + private final PipelineJobItemFacade jobItemFacade; - private final PipelineJobItemProcessGovernanceRepository jobItemProcessGovernanceRepository; - - private final PipelineJobItemErrorMessageGovernanceRepository jobItemErrorMessageGovernanceRepository; - - private final PipelineJobCheckGovernanceRepository jobCheckGovernanceRepository; - - private final PipelineJobGovernanceRepository jobGovernanceRepository; - - private final PipelineMetaDataDataSourceGovernanceRepository metaDataDataSourceGovernanceRepository; - - private final PipelineMetaDataProcessConfigurationGovernanceRepository metaDataProcessConfigurationGovernanceRepository; + private final PipelineMetaDataFacade metaDataFacade; public PipelineGovernanceFacade(final ClusterPersistRepository repository) { this.repository = repository; - jobConfigurationGovernanceRepository = new PipelineJobConfigurationGovernanceRepository(repository); - jobOffsetGovernanceRepository = new PipelineJobOffsetGovernanceRepository(repository); - jobItemProcessGovernanceRepository = new PipelineJobItemProcessGovernanceRepository(repository); - jobItemErrorMessageGovernanceRepository = new PipelineJobItemErrorMessageGovernanceRepository(repository); - jobCheckGovernanceRepository = new PipelineJobCheckGovernanceRepository(repository); - jobGovernanceRepository = new PipelineJobGovernanceRepository(repository); - metaDataDataSourceGovernanceRepository = new PipelineMetaDataDataSourceGovernanceRepository(repository); - metaDataProcessConfigurationGovernanceRepository = new PipelineMetaDataProcessConfigurationGovernanceRepository(repository); + jobFacade = new PipelineJobFacade(repository); + jobItemFacade = new PipelineJobItemFacade(repository); + metaDataFacade = new PipelineMetaDataFacade(repository); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java index eee1bd77b57ee..213302fa88fee 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemErrorMessageGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java new file mode 100644 index 0000000000000..0bf38b96a22fe --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemFacade.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item; + +import lombok.Getter; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; + +/** + * Pipeline job item facade. + */ +@Getter +public final class PipelineJobItemFacade { + + private final PipelineJobItemProcessGovernanceRepository process; + + private final PipelineJobItemErrorMessageGovernanceRepository errorMessage; + + public PipelineJobItemFacade(final ClusterPersistRepository repository) { + process = new PipelineJobItemProcessGovernanceRepository(repository); + errorMessage = new PipelineJobItemErrorMessageGovernanceRepository(repository); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java similarity index 99% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java index 7ea4887fdb877..6beb8cfc8dbdb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobItemProcessGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemProcessGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item; import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java similarity index 99% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java index e5c44dfdd3c5c..15ae3ea230ba1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobCheckGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobCheckGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job; import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java index 9b6d13bf983e3..1c4a1af066b3c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobConfigurationGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobConfigurationGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java new file mode 100644 index 0000000000000..9e101ece185fc --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobFacade.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job; + +import lombok.Getter; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; + +/** + * Pipeline job facade. + */ +@Getter +public final class PipelineJobFacade { + + private final PipelineJobGovernanceRepository job; + + private final PipelineJobConfigurationGovernanceRepository configuration; + + private final PipelineJobOffsetGovernanceRepository offset; + + private final PipelineJobCheckGovernanceRepository check; + + public PipelineJobFacade(final ClusterPersistRepository repository) { + job = new PipelineJobGovernanceRepository(repository); + configuration = new PipelineJobConfigurationGovernanceRepository(repository); + offset = new PipelineJobOffsetGovernanceRepository(repository); + check = new PipelineJobCheckGovernanceRepository(repository); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java index a403f0f7b0e65..606020cf405e0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java similarity index 99% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java index c007b67574f9c..c75ace6e8ca22 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineJobOffsetGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/job/PipelineJobOffsetGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.job; import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java index 870cc0c20dc16..70a973c8be5b5 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataDataSourceGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataDataSourceGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java new file mode 100644 index 0000000000000..4d3a6a430a9d4 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataFacade.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata; + +import lombok.Getter; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; + +/** + * Pipeline job item facade. + */ +@Getter +public final class PipelineMetaDataFacade { + + private final PipelineMetaDataDataSourceGovernanceRepository dataSource; + + private final PipelineMetaDataProcessConfigurationGovernanceRepository processConfiguration; + + public PipelineMetaDataFacade(final ClusterPersistRepository repository) { + dataSource = new PipelineMetaDataDataSourceGovernanceRepository(repository); + processConfiguration = new PipelineMetaDataProcessConfigurationGovernanceRepository(repository); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java index e6f3e1ffb5803..6f2802e735bfc 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/PipelineMetaDataProcessConfigurationGovernanceRepository.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/metadata/PipelineMetaDataProcessConfigurationGovernanceRepository.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.metadata; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java index b63e9b5b2695c..1066c68d9f3b5 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java @@ -46,7 +46,7 @@ public PipelineJobIteErrorMessageManager(final String jobId, final int shardingI * @return map, key is sharding item, value is error message */ public String getErrorMessage() { - return Optional.ofNullable(governanceFacade.getJobItemErrorMessageGovernanceRepository().load(jobId, shardingItem)).orElse(""); + return Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId, shardingItem)).orElse(""); } /** @@ -55,7 +55,7 @@ public String getErrorMessage() { * @param error error */ public void updateErrorMessage(final Object error) { - governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, shardingItem, null == error ? "" : buildErrorMessage(error)); + governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, null == error ? "" : buildErrorMessage(error)); } private String buildErrorMessage(final Object error) { @@ -66,6 +66,6 @@ private String buildErrorMessage(final Object error) { * Clean job item error message. */ public void cleanErrorMessage() { - governanceFacade.getJobItemErrorMessageGovernanceRepository().update(jobId, shardingItem, ""); + governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ""); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java index d2463bc7f5f8b..c9643d284b1f8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java @@ -55,7 +55,7 @@ public void updateStatus(final String jobId, final int shardingItem, final JobSt } jobItemProgress.get().setStatus(status); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)) - .getJobItemProcessGovernanceRepository().update(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get()))); + .getJobItemFacade().getProcess().update(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get()))); } /** @@ -66,7 +66,7 @@ public void updateStatus(final String jobId, final int shardingItem, final JobSt * @return job item progress */ public Optional getProgress(final String jobId, final int shardingItem) { - return PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().load(jobId, shardingItem) + return PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().load(jobId, shardingItem) .map(optional -> swapper.swapToObject(YamlEngine.unmarshal(optional, swapper.getYamlProgressClass(), true))); } @@ -77,7 +77,7 @@ public Optional getProgress(final String jobId, final int shardingItem) { */ public void persistProgress(final PipelineJobItemContext jobItemContext) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext)); + .getJobItemFacade().getProcess().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext)); } /** @@ -87,7 +87,7 @@ public void persistProgress(final PipelineJobItemContext jobItemContext) { */ public void updateProgress(final PipelineJobItemContext jobItemContext) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext)); + .getJobItemFacade().getProcess().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext)); } @SuppressWarnings("unchecked") diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 59d205b852832..1a11a902d52ca 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -75,12 +75,12 @@ public Optional start(final PipelineJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId)); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)); - if (governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobId)) { + if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); return Optional.of(jobId); } - governanceFacade.getJobGovernanceRepository().create(jobId, jobAPI.getJobClass()); - governanceFacade.getJobConfigurationGovernanceRepository().persist(jobId, jobConfig.convertToJobConfigurationPOJO()); + governanceFacade.getJobFacade().getJob().create(jobId, jobAPI.getJobClass()); + governanceFacade.getJobFacade().getConfiguration().persist(jobId, jobConfig.convertToJobConfigurationPOJO()); return Optional.of(jobId); } @@ -119,7 +119,7 @@ private void startCurrentDisabledJob(final String jobId) { } private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) { - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional -> { + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStartDisabledNextJobType)).startDisabledJob(optional); // CHECKSTYLE:OFF @@ -141,7 +141,7 @@ public void stop(final String jobId) { } private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) { - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(jobId).ifPresent(optional -> { + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStoppedPreviousJobType)).stop(optional); // CHECKSTYLE:OFF @@ -176,7 +176,7 @@ private void stopCurrentJob(final String jobId) { public void drop(final String jobId) { PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId); PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null); - PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobGovernanceRepository().delete(jobId); + PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java index a951f9fee3c92..924ae8859a9af 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java @@ -39,7 +39,7 @@ public final class PipelineDataSourcePersistService implements PipelineMetaDataP @Override @SuppressWarnings("unchecked") public Map load(final PipelineContextKey contextKey, final String jobType) { - String dataSourcesProps = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().load(jobType); + String dataSourcesProps = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().load(jobType); if (Strings.isNullOrEmpty(dataSourcesProps)) { return Collections.emptyMap(); } @@ -55,6 +55,6 @@ public void persist(final PipelineContextKey contextKey, final String jobType, f for (Entry entry : propsMap.entrySet()) { dataSourceMap.put(entry.getKey(), swapper.swapToMap(entry.getValue())); } - PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataDataSourceGovernanceRepository().persist(jobType, YamlEngine.marshal(dataSourceMap)); + PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getDataSource().persist(jobType, YamlEngine.marshal(dataSourceMap)); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java index 5b8c1bce79847..97954a5cc96bc 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java @@ -34,7 +34,7 @@ public final class PipelineProcessConfigurationPersistService implements Pipelin @Override public PipelineProcessConfiguration load(final PipelineContextKey contextKey, final String jobType) { - String yamlText = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().load(jobType); + String yamlText = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().load(jobType); if (Strings.isNullOrEmpty(yamlText)) { return null; } @@ -45,6 +45,6 @@ public PipelineProcessConfiguration load(final PipelineContextKey contextKey, fi @Override public void persist(final PipelineContextKey contextKey, final String jobType, final PipelineProcessConfiguration processConfig) { String yamlText = YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig)); - PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataProcessConfigurationGovernanceRepository().persist(jobType, yamlText); + PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getMetaDataFacade().getProcessConfiguration().persist(jobType, yamlText); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 0b4a06dd3687b..614d06c9f407e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -121,13 +121,13 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId())); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); - if (governanceFacade.getJobConfigurationGovernanceRepository().isExisted(jobConfig.getJobId())) { + if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) { log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { - governanceFacade.getJobGovernanceRepository().create(jobConfig.getJobId(), getJobClass()); + governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), getJobClass()); JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); - governanceFacade.getJobConfigurationGovernanceRepository().persist(jobConfig.getJobId(), jobConfigPOJO); + governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO); if (!param.isFull()) { initIncrementalPosition(jobConfig); } @@ -177,7 +177,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { } IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProcessGovernanceRepository().persist( + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist( jobId, i, YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } } catch (final SQLException ex) { diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index d9a4811172295..a9a846bed49bf 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -80,7 +80,7 @@ public final class ConsistencyCheckJobAPI implements PipelineJobAPI { public String createJobAndStart(final CreateConsistencyCheckJobParameter param) { String parentJobId = param.getParentJobId(); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); if (latestCheckJobId.isPresent()) { PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); Optional progress = jobItemManager.getProgress(latestCheckJobId.get(), 0); @@ -92,8 +92,8 @@ public String createJobAndStart(final CreateConsistencyCheckJobParameter param) verifyPipelineDatabaseType(param); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); - governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId, result); - governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId, result); + governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result); + governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result); new PipelineJobManager(this).drop(result); YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration(); yamlConfig.setJobId(result); @@ -126,7 +126,7 @@ public void startByParentJobId(final String parentJobId) { } private String getLatestCheckJobId(final String parentJobId) { - Optional result = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional result = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId); ShardingSpherePreconditions.checkState(result.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId)); return result.get(); } @@ -150,16 +150,16 @@ public void dropByParentJobId(final String parentJobId) { new PipelineJobManager(this).stop(latestCheckJobId); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey); - Collection checkJobIds = governanceFacade.getJobCheckGovernanceRepository().listCheckJobIds(parentJobId); + Collection checkJobIds = governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId); Optional previousSequence = ConsistencyCheckSequence.getPreviousSequence( checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId)); if (previousSequence.isPresent()) { String checkJobId = new ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()).marshal(); - governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId, checkJobId); + governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, checkJobId); } else { - governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId); + governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId); } - governanceFacade.getJobCheckGovernanceRepository().deleteCheckJobResult(parentJobId, latestCheckJobId); + governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, latestCheckJobId); new PipelineJobManager(this).drop(latestCheckJobId); } @@ -171,7 +171,7 @@ public void dropByParentJobId(final String parentJobId) { */ public List getJobItemInfos(final String parentJobId) { PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId)); String checkJobId = latestCheckJobId.get(); PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); @@ -182,7 +182,7 @@ public List getJobItemInfos(final String parentJobI List result = new LinkedList<>(); ConsistencyCheckJobItemProgress jobItemProgress = progress.get(); if (!Strings.isNullOrEmpty(jobItemProgress.getIgnoredTableNames())) { - Map checkJobResult = governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId, latestCheckJobId.get()); + Map checkJobResult = governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, latestCheckJobId.get()); result.addAll(buildIgnoredTableInfo(jobItemProgress.getIgnoredTableNames().split(","), checkJobResult)); } if (Objects.equals(jobItemProgress.getIgnoredTableNames(), jobItemProgress.getTableNames())) { @@ -212,7 +212,7 @@ private List buildIgnoredTableInfo(final String[] i private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId)); String checkJobId = latestCheckJobId.get(); PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); @@ -233,7 +233,7 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); fillInJobItemInfoWithCheckAlgorithm(result, checkJobId); result.setErrorMessage(new PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage()); - Map checkJobResults = governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(parentJobId, checkJobId); + Map checkJobResults = governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, checkJobId); result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched)); result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) .map(Entry::getKey).collect(Collectors.joining(","))); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 0036cb4611397..f705d6b01abc1 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -114,7 +114,7 @@ protected void runBlocking() { parentJobId, checkJobConfig.getAlgorithmTypeName(), checkResultMap, jobItemContext.isStopping()); if (!jobItemContext.isStopping()) { PipelineAPIFactory.getPipelineGovernanceFacade( - PipelineJobIdUtils.parseContextKey(parentJobId)).getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId, checkJobId, checkResultMap); + PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().persistCheckJobResult(parentJobId, checkJobId, checkResultMap); } } finally { jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 66d16150c0969..e7e4f0bf06ee3 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -306,7 +306,7 @@ public void rollback(final String jobId) throws SQLException { } private void dropCheckJobs(final String jobId) { - Collection checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobCheckGovernanceRepository().listCheckJobIds(jobId); + Collection checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId); if (checkJobIds.isEmpty()) { return; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 993009506e719..26918b29f9b3f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -134,12 +134,12 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem if (lockContext.tryLock(lockDefinition, 600000)) { log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); try { - JobOffsetInfo offsetInfo = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().load(jobId); + JobOffsetInfo offsetInfo = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId); if (!offsetInfo.isTargetSchemaTableCreated()) { jobItemContext.setStatus(JobStatus.PREPARING); jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING); prepareAndCheckTarget(jobItemContext); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetGovernanceRepository().persist(jobId, new JobOffsetInfo(true)); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId, new JobOffsetInfo(true)); } } finally { log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java index 75c9516d8fde6..424f48d82c963 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; -import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineJobItemProcessGovernanceRepository; +import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; @@ -96,7 +96,7 @@ void assertWatch() throws InterruptedException { void assertDeleteJob() { ClusterPersistRepository clusterPersistRepository = getClusterPersistRepository(); clusterPersistRepository.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); - governanceFacade.getJobGovernanceRepository().delete("1"); + governanceFacade.getJobFacade().getJob().delete("1"); Optional actual = new PipelineJobItemProcessGovernanceRepository(clusterPersistRepository).load("1", 0); assertFalse(actual.isPresent()); } @@ -104,21 +104,21 @@ void assertDeleteJob() { @Test void assertIsExistedJobConfiguration() { ClusterPersistRepository clusterPersistRepository = getClusterPersistRepository(); - assertFalse(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job")); + assertFalse(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job")); clusterPersistRepository.persist("/pipeline/jobs/foo_job/config", "foo"); - assertTrue(governanceFacade.getJobConfigurationGovernanceRepository().isExisted("foo_job")); + assertTrue(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job")); } @Test void assertLatestCheckJobIdPersistenceDeletion() { String parentJobId = "testParentJob"; String expectedCheckJobId = "testCheckJob"; - governanceFacade.getJobCheckGovernanceRepository().persistLatestCheckJobId(parentJobId, expectedCheckJobId); - Optional actualCheckJobIdOpt = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, expectedCheckJobId); + Optional actualCheckJobIdOpt = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertTrue(actualCheckJobIdOpt.isPresent()); assertThat(actualCheckJobIdOpt.get(), is(expectedCheckJobId)); - governanceFacade.getJobCheckGovernanceRepository().deleteLatestCheckJobId(parentJobId); - assertFalse(governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId).isPresent()); + governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId); + assertFalse(governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId).isPresent()); } @Test @@ -126,8 +126,8 @@ void assertPersistJobCheckResult() { MigrationJobItemContext jobItemContext = mockJobItemContext(); Map actual = new HashMap<>(); actual.put("test", new TableDataConsistencyCheckResult(true)); - governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(jobItemContext.getJobId(), "j02123", actual); - Map checkResult = governanceFacade.getJobCheckGovernanceRepository().getCheckJobResult(jobItemContext.getJobId(), "j02123"); + governanceFacade.getJobFacade().getCheck().persistCheckJobResult(jobItemContext.getJobId(), "j02123", actual); + Map checkResult = governanceFacade.getJobFacade().getCheck().getCheckJobResult(jobItemContext.getJobId(), "j02123"); assertThat(checkResult.size(), is(1)); assertTrue(checkResult.get("test").isMatched()); } @@ -135,23 +135,23 @@ void assertPersistJobCheckResult() { @Test void assertPersistJobItemProcess() { MigrationJobItemContext jobItemContext = mockJobItemContext(); - governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue1"); - assertFalse(governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()); - governanceFacade.getJobItemProcessGovernanceRepository().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue1"); - Optional actual = governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()); + governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue1"); + assertFalse(governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()); + governanceFacade.getJobItemFacade().getProcess().persist(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue1"); + Optional actual = governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()); assertTrue(actual.isPresent()); assertThat(actual.get(), is("testValue1")); - governanceFacade.getJobItemProcessGovernanceRepository().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue2"); - actual = governanceFacade.getJobItemProcessGovernanceRepository().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()); + governanceFacade.getJobItemFacade().getProcess().update(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue2"); + actual = governanceFacade.getJobItemFacade().getProcess().load(jobItemContext.getJobId(), jobItemContext.getShardingItem()); assertTrue(actual.isPresent()); assertThat(actual.get(), is("testValue2")); } @Test void assertPersistJobOffset() { - assertFalse(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated()); - governanceFacade.getJobOffsetGovernanceRepository().persist("1", new JobOffsetInfo(true)); - assertTrue(governanceFacade.getJobOffsetGovernanceRepository().load("1").isTargetSchemaTableCreated()); + assertFalse(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated()); + governanceFacade.getJobFacade().getOffset().persist("1", new JobOffsetInfo(true)); + assertTrue(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated()); } private ClusterPersistRepository getClusterPersistRepository() { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java index 437293bca13c6..3802e92bcd359 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java @@ -51,7 +51,7 @@ void assertBuildPipelineJobItemContext() { ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId()); String checkJobId = pipelineJobId.marshal(); Map expectTableCheckPosition = Collections.singletonMap("t_order", 100); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(checkJobId, 0, + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition))); ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob(checkJobId); ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildPipelineJobItemContext( diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 309b944d451f2..b0ff58bbfaf99 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -91,20 +91,20 @@ void assertDropByParentJobId() { new ConsistencyCheckJobConfiguration(checkJobId, parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 0, JobStatus.FINISHED, null); jobItemManager.persistProgress(checkJobItemContext); Map dataConsistencyCheckResult = Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true)); - governanceFacade.getJobCheckGovernanceRepository().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertTrue(latestCheckJobId.isPresent()); assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence++)); } expectedSequence = 2; for (int i = 0; i < 2; i++) { jobAPI.dropByParentJobId(parentJobId); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertTrue(latestCheckJobId.isPresent()); assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence--)); } jobAPI.dropByParentJobId(parentJobId); - Optional latestCheckJobId = governanceFacade.getJobCheckGovernanceRepository().getLatestCheckJobId(parentJobId); + Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertFalse(latestCheckJobId.isPresent()); } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 9c3418ef846c4..be69e9dd0eb66 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -303,7 +303,7 @@ void assertGetJobItemInfosAtBegin() { YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress(); yamlJobItemProgress.setStatus(JobStatus.RUNNING.name()); yamlJobItemProgress.setSourceDatabaseType("MySQL"); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); assertThat(jobItemInfos.size(), is(1)); InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); @@ -320,7 +320,7 @@ void assertGetJobItemInfosAtIncrementTask() { yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name()); yamlJobItemProgress.setProcessedRecordsCount(100); yamlJobItemProgress.setInventoryRecordsCount(50); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemProcessGovernanceRepository().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index 3389cb2555c6d..64d9d19321386 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -63,7 +63,7 @@ void assertCountAndDataCheck() throws SQLException { jobConfigurationPOJO.setShardingTotalCount(1); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); - governanceFacade.getJobItemProcessGovernanceRepository().persist(jobConfig.getJobId(), 0, ""); + governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, ""); Map actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null), createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null); String checkKey = "t_order";