diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/constant/DataPipelineConstants.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineDataNodeConstants.java similarity index 74% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/constant/DataPipelineConstants.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineDataNodeConstants.java index 4703fdd312c2b..963f879603210 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/constant/DataPipelineConstants.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineDataNodeConstants.java @@ -15,24 +15,19 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.common.constant; +package org.apache.shardingsphere.data.pipeline.common.metadata.node; import lombok.AccessLevel; import lombok.NoArgsConstructor; /** - * Data pipeline constants. + * Data pipeline data node constants. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class DataPipelineConstants { - - /** - * Data pipeline node name. - */ - public static final String DATA_PIPELINE_NODE_NAME = "pipeline"; +public final class DataPipelineDataNodeConstants { /** * Data pipeline root path. */ - public static final String DATA_PIPELINE_ROOT = "/" + DATA_PIPELINE_NODE_NAME; + public static final String DATA_PIPELINE_ROOT = "/pipeline"; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java index 45034bebfbeec..b42f7c896024b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java @@ -19,7 +19,6 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; -import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants; import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import java.util.regex.Pattern; @@ -30,7 +29,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PipelineMetaDataNode { - private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)"; + private static final String JOB_PATTERN_PREFIX = DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)"; public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config"); @@ -48,8 +47,8 @@ public static String getMetaDataDataSourcesPath(final JobType jobType) { private static String getMetaDataRootPath(final JobType jobType) { return null == jobType - ? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "metadata") - : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata"); + ? String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, "metadata") + : String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata"); } /** @@ -73,7 +72,7 @@ public static String getElasticJobNamespace() { } private static String getJobsPath() { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "jobs"); + return String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, "jobs"); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java index c02c902aaf98b..d2d54b075c192 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.metadata.node.event.handler.PipelineMetaDataChangedEventHandler; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; @@ -51,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher { private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) { listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class) .stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each))); - PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent); + PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, this::dispatchEvent); } private void dispatchEvent(final DataChangedEvent event) { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index c6aeecfcfcc05..af037ba01439b 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; -import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants; +import org.apache.shardingsphere.data.pipeline.common.metadata.node.DataPipelineDataNodeConstants; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; @@ -70,8 +70,8 @@ static void beforeClass() { } private static void watch() { - governanceRepositoryAPI.watch(DataPipelineConstants.DATA_PIPELINE_ROOT, event -> { - if ((DataPipelineConstants.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) { + governanceRepositoryAPI.watch(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, event -> { + if ((DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) { EVENT_ATOMIC_REFERENCE.set(event); COUNT_DOWN_LATCH.countDown(); } @@ -114,7 +114,7 @@ void assertPersistJobCheckResult() { @Test void assertDeleteJob() { - governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", ""); + governanceRepositoryAPI.persist(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1", ""); governanceRepositoryAPI.deleteJob("1"); Optional actual = governanceRepositoryAPI.getJobItemProgress("1", 0); assertFalse(actual.isPresent()); @@ -122,15 +122,15 @@ void assertDeleteJob() { @Test void assertGetChildrenKeys() { - governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", ""); - List actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineConstants.DATA_PIPELINE_ROOT); + governanceRepositoryAPI.persist(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1", ""); + List actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT); assertFalse(actual.isEmpty()); assertTrue(actual.contains("1")); } @Test void assertWatch() throws InterruptedException { - String key = DataPipelineConstants.DATA_PIPELINE_ROOT + "/1"; + String key = DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1"; governanceRepositoryAPI.persist(key, ""); boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS); assertTrue(awaitResult);