From b9288841882245c911fa31bf6902517b0c99a328 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 18 Nov 2023 15:01:07 +0800 Subject: [PATCH 1/2] Add YamlPipelineJobItemProgressSwapper (#29073) --- ...onsistencyCheckJobItemProgressSwapper.java | 4 +- ...toryIncrementalJobItemProgressSwapper.java | 4 +- .../PipelineJobProgressPersistService.java | 3 +- .../service/InventoryIncrementalJobAPI.java | 6 +++ .../core/job/service/PipelineJobAPI.java | 30 +++++-------- .../core/job/service/PipelineJobManager.java | 44 +++++++++++++++---- ...bstractInventoryIncrementalJobAPIImpl.java | 27 +----------- .../YamlPipelineJobItemProgressSwapper.java | 31 +++++++++++++ .../InventoryIncrementalTasksRunner.java | 2 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +-- .../cdc/core/prepare/CDCJobPreparer.java | 5 ++- .../api/impl/ConsistencyCheckJobAPI.java | 30 ++++--------- .../task/ConsistencyCheckTasksRunner.java | 10 ++--- .../migration/api/impl/MigrationJobAPI.java | 2 +- .../prepare/MigrationJobPreparer.java | 5 ++- .../api/impl/ConsistencyCheckJobAPITest.java | 16 ++++--- .../api/impl/MigrationJobAPITest.java | 4 +- 17 files changed, 129 insertions(+), 100 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java index c399505edf2bb..066df10fc2874 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java @@ -19,12 +19,12 @@ import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; -import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; /** * YAML data check job item progress swapper. */ -public final class YamlConsistencyCheckJobItemProgressSwapper implements YamlConfigurationSwapper { +public final class YamlConsistencyCheckJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper { @Override public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final ConsistencyCheckJobItemProgress data) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java index 696a4e5f34eb4..24c2f6a5a74bd 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java @@ -19,14 +19,14 @@ import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; /** * YAML inventory incremental job item progress swapper. */ -public final class YamlInventoryIncrementalJobItemProgressSwapper implements YamlConfigurationSwapper { +public final class YamlInventoryIncrementalJobItemProgressSwapper implements YamlPipelineJobItemProgressSwapper { private final YamlJobItemInventoryTasksProgressSwapper inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index 478cd13bb6c93..87e13a2ea0971 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -129,7 +130,7 @@ private static synchronized void persist(final String jobId, final int shardingI } persistContext.getHasNewEvents().set(false); long startTimeMillis = System.currentTimeMillis(); - TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType()).updateJobItemProgress(jobItemContext.get()); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get()); persistContext.getBeforePersistingProgressMillis().set(null); if (6 == ThreadLocalRandom.current().nextInt(100)) { log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java index 487555edcb51a..908c3103c7605 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; @@ -43,6 +44,11 @@ */ public interface InventoryIncrementalJobAPI extends PipelineJobAPI { + @Override + default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlInventoryIncrementalJobItemProgressSwapper(); + } + /** * Get pipeline job info. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 6a4d6a5fcc4ca..b5ab26690a031 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -17,11 +17,11 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; @@ -34,12 +34,20 @@ public interface PipelineJobAPI extends TypedSPI { /** - * Get YAML job configuration swapper. + * Get YAML pipeline job configuration swapper. * - * @return YAML job configuration swapper + * @return YAML pipeline job configuration swapper */ YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); + /** + * Get YAML pipeline job item progress swapper. + * + * @return YAML pipeline job item progress swapper + */ + @SuppressWarnings("rawtypes") + YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper(); + /** * Whether to ignore to start disabled job when job item progress is finished. * @@ -67,20 +75,6 @@ default Optional getToBeStoppedPreviousJobType() { return Optional.empty(); } - /** - * Persist job item progress. - * - * @param jobItemContext job item context - */ - void persistJobItemProgress(PipelineJobItemContext jobItemContext); - - /** - * Update job item progress. - * - * @param jobItemContext job item context - */ - void updateJobItemProgress(PipelineJobItemContext jobItemContext); - /** * Get job item progress. * @@ -104,7 +98,7 @@ default Optional getToBeStoppedPreviousJobType() { * * @return pipeline job class */ - Class getPipelineJobClass(); + Class getJobClass(); @Override String getType(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index be7c8121cb3d4..99d846dd6d709 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; @@ -55,7 +56,7 @@ public final class PipelineJobManager { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private final PipelineJobAPI pipelineJobAPI; + private final PipelineJobAPI jobAPI; /** * Get job configuration. @@ -64,7 +65,7 @@ public final class PipelineJobManager { * @return pipeline job configuration */ public PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return pipelineJobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + return jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } /** @@ -82,7 +83,7 @@ public Optional start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); return Optional.of(jobId); } - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), pipelineJobAPI.getPipelineJobClass().getName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName()); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); return Optional.of(jobId); } @@ -93,15 +94,15 @@ public Optional start(final PipelineJobConfiguration jobConfig) { * @param jobId job id */ public void startDisabledJob(final String jobId) { - if (pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { - Optional jobItemProgress = pipelineJobAPI.getJobItemProgress(jobId, 0); + if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { + Optional jobItemProgress = jobAPI.getJobItemProgress(jobId, 0); if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) { log.info("job status is FINISHED, ignore, jobId={}", jobId); return; } } startCurrentDisabledJob(jobId); - pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); + jobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); } @@ -139,7 +140,7 @@ private void startNextDisabledJob(final String jobId, final String toBeStartDisa * @param jobId job id */ public void stop(final String jobId) { - pipelineJobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); + jobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); stopCurrentJob(jobId); } @@ -189,8 +190,8 @@ public void drop(final String jobId) { * @return jobs info */ public List getPipelineJobInfos(final PipelineContextKey contextKey) { - if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) { - return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); + if (jobAPI instanceof InventoryIncrementalJobAPI) { + return getJobBriefInfos(contextKey, jobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); } return Collections.emptyList(); } @@ -200,6 +201,31 @@ private Stream getJobBriefInfos(final PipelineContextKey contextKe .filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())); } + /** + * Persist job item progress. + * + * @param jobItemContext job item context + */ + public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) { + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) + .persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); + } + + /** + * Update job item progress. + * + * @param jobItemContext job item context + */ + public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) { + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) + .updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); + } + + @SuppressWarnings("unchecked") + private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) { + return YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress())); + } + /** * Get job item error message. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index 372971129cc55..1706b2c4c9f24 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -17,19 +17,15 @@ package org.apache.shardingsphere.data.pipeline.core.job.service.impl; -import lombok.AccessLevel; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; @@ -66,9 +62,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl implements Inventor private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); - @Getter(AccessLevel.PROTECTED) - private final YamlInventoryIncrementalJobItemProgressSwapper jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper(); - private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper(); @Override @@ -121,22 +114,6 @@ public List getJobItemInfos(final String jobId) return result; } - @Override - public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) { - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); - } - - @Override - public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) { - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); - } - - private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) { - return YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress) jobItemContext.toProgress())); - } - @Override public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) { String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo)); @@ -156,7 +133,7 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) { @Override public Optional getJobItemProgress(final String jobId, final int shardingItem) { Optional progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem); - return progress.map(optional -> jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(optional, YamlInventoryIncrementalJobItemProgress.class))); + return progress.map(optional -> getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional, YamlInventoryIncrementalJobItemProgress.class))); } @Override @@ -167,7 +144,7 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina } jobItemProgress.get().setStatus(status); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, - YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get()))); + YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get()))); } @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java new file mode 100644 index 0000000000000..90f522b0910b0 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.yaml; + +import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; +import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; + +/** + * YAML pipeline job configuration swapper. + * + * @param type of YAML configuration + * @param type of swapped pipeline job item progress + */ +public interface YamlPipelineJobItemProgressSwapper extends YamlConfigurationSwapper { +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java index 1923ad14911f3..ad9e3ccb81b06 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java @@ -83,7 +83,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext); if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) { log.info("All inventory tasks finished."); executeIncrementalTask(); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index dc0bda5338011..070328d868ee7 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -123,7 +123,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT if (repositoryAPI.isExisted(jobConfigKey)) { log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); } else { - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName()); JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO)); @@ -176,7 +176,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( - jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); + jobId, i, YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobId, ex); @@ -329,7 +329,7 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final } @Override - public Class getPipelineJobClass() { + public Class getJobClass() { return CDCJob.class; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 681307a84c1a0..e79887f33cd72 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -41,6 +41,7 @@ import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -68,6 +69,8 @@ public final class CDCJobPreparer { private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + /** * Do prepare work. * @@ -88,7 +91,7 @@ private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBool final AtomicBoolean incrementalImporterUsed, final List incrementalChannelProgressPairs) { Optional jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); if (!jobItemProgress.isPresent()) { - jobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); } if (jobItemContext.isStopping()) { PipelineJobCenter.stop(jobItemContext.getJobId()); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index da0f06e6ed9b8..62a1520c06bf6 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -20,7 +20,6 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress; @@ -73,8 +72,6 @@ public final class ConsistencyCheckJobAPI implements PipelineJobAPI { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - private final YamlConsistencyCheckJobItemProgressSwapper swapper = new YamlConsistencyCheckJobItemProgressSwapper(); - /** * Create consistency check configuration and start job. * @@ -120,26 +117,10 @@ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { return true; } - @Override - public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) { - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); - } - - private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) { - return YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress) jobItemContext.toProgress())); - } - - @Override - public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) { - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId())) - .updateJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); - } - @Override public Optional getJobItemProgress(final String jobId, final int shardingItem) { Optional progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem); - return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true))); + return progress.map(s -> getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true))); } @Override @@ -151,7 +132,7 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina } jobItemProgress.get().setStatus(status); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, - YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get()))); + YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get()))); } /** @@ -326,7 +307,12 @@ public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwappe } @Override - public Class getPipelineJobClass() { + public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlConsistencyCheckJobItemProgressSwapper(); + } + + @Override + public Class getJobClass() { return ConsistencyCheckJob.class; } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 43f826dc52cd7..37947db77b87b 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -50,9 +50,9 @@ @Slf4j public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { - private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); - private final PipelineJobManager jobManager = new PipelineJobManager(checkJobAPI); + private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); @Getter private final ConsistencyCheckJobItemContext jobItemContext; @@ -80,7 +80,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext); CompletableFuture future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor); ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback()); } @@ -95,7 +95,7 @@ private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifec @Override protected void runBlocking() { - checkJobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId)); @@ -133,7 +133,7 @@ public void onSuccess() { } log.info("onSuccess, check job id: {}, parent job id: {}", checkJobId, parentJobId); jobItemContext.setStatus(JobStatus.FINISHED); - checkJobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); jobManager.stop(checkJobId); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 1f2378d05d3bb..3d12a4363bc0f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -445,7 +445,7 @@ public void refreshTableMetadata(final String jobId, final String databaseName) } @Override - public Class getPipelineJobClass() { + public Class getJobClass() { return MigrationJob.class; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index cf73d15646bc1..37772491969b1 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -45,6 +45,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter; @@ -81,6 +82,8 @@ public final class MigrationJobPreparer { private final MigrationJobAPI jobAPI = new MigrationJobAPI(); + private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + /** * Do prepare work. * @@ -123,7 +126,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem String jobId = jobConfig.getJobId(); LockContext lockContext = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext(); if (!jobAPI.getJobItemProgress(jobId, jobItemContext.getShardingItem()).isPresent()) { - jobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); } LockDefinition lockDefinition = new GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), jobConfig.getJobId())); long startTimeMillis = System.currentTimeMillis(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 6ab0ef41af097..deeae3d64e539 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -50,7 +50,9 @@ class ConsistencyCheckJobAPITest { - private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + + private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new YamlMigrationJobConfigurationSwapper(); @@ -63,9 +65,9 @@ public static void beforeClass() { void assertCreateJobConfig() { MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration()); String parentJobId = parentJobConfig.getJobId(); - String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, + String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(checkJobAPI) + ConsistencyCheckJobConfiguration checkJobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI) .getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); @@ -82,11 +84,11 @@ void assertDropByParentJobId() { GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()); int expectedSequence = 1; for (int i = 0; i < 3; i++) { - String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, + String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); ConsistencyCheckJobItemContext checkJobItemContext = new ConsistencyCheckJobItemContext( new ConsistencyCheckJobConfiguration(checkJobId, parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 0, JobStatus.FINISHED, null); - checkJobAPI.persistJobItemProgress(checkJobItemContext); + jobManager.persistJobItemProgress(checkJobItemContext); Map dataConsistencyCheckResult = Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true)); repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult); Optional latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId); @@ -95,12 +97,12 @@ void assertDropByParentJobId() { } expectedSequence = 2; for (int i = 0; i < 2; i++) { - checkJobAPI.dropByParentJobId(parentJobId); + jobAPI.dropByParentJobId(parentJobId); Optional latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId); assertTrue(latestCheckJobId.isPresent()); assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence--)); } - checkJobAPI.dropByParentJobId(parentJobId); + jobAPI.dropByParentJobId(parentJobId); Optional latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId); assertFalse(latestCheckJobId.isPresent()); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 472a14550de05..fd0e33cdfa0b7 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -212,7 +212,7 @@ void assertSwitchClusterConfigurationSucceed() { Optional jobId = jobManager.start(jobConfig); assertTrue(jobId.isPresent()); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); - jobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); Map progress = jobAPI.getJobProgress(jobConfig); for (Entry entry : progress.entrySet()) { @@ -245,7 +245,7 @@ private void initTableData(final DataSource pipelineDataSource) throws SQLExcept void assertRenewJobStatus() { final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); - jobAPI.persistJobItemProgress(jobItemContext); + jobManager.persistJobItemProgress(jobItemContext); jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); Optional actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); assertTrue(actual.isPresent()); From 08eaa477449e9fe24d7db8e9fd828681b718462c Mon Sep 17 00:00:00 2001 From: +7 Date: Sat, 18 Nov 2023 15:15:19 +0800 Subject: [PATCH 2/2] SQL parsing Enhancement: add more SQL test case for create-materialized-view (#29072) * add more SQL test case for create-materialized-view * format code --- .../case/ddl/create-materialized-view.xml | 5 +++ .../ddl/create-materialized-view.xml | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/test/it/parser/src/main/resources/case/ddl/create-materialized-view.xml b/test/it/parser/src/main/resources/case/ddl/create-materialized-view.xml index 81c7cfd2f9e97..9a6325092e10f 100644 --- a/test/it/parser/src/main/resources/case/ddl/create-materialized-view.xml +++ b/test/it/parser/src/main/resources/case/ddl/create-materialized-view.xml @@ -26,4 +26,9 @@ + + + + + diff --git a/test/it/parser/src/main/resources/sql/supported/ddl/create-materialized-view.xml b/test/it/parser/src/main/resources/sql/supported/ddl/create-materialized-view.xml index e45c17d1788b4..b2754b3a647c6 100644 --- a/test/it/parser/src/main/resources/sql/supported/ddl/create-materialized-view.xml +++ b/test/it/parser/src/main/resources/sql/supported/ddl/create-materialized-view.xml @@ -54,4 +54,39 @@ FROM times t, sales s, customers c WHERE s.time_id = t.time_id AND s.cust_id = c.cust_id GROUP BY t.calendar_month_desc, c.cust_state_province;" db-types="Oracle" /> + + + + +