From 89233548ecc052c488684e93c1917bc44fb91993 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Wed, 20 Dec 2023 23:50:00 +0800 Subject: [PATCH] Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478) * Rename DataSourceCheckEngine.checkSourceDataSources() * Refactor DataSourceCheckEngine * Rename IncrementalTaskPreparer * Rename IncrementalTaskPositionManager * Rename IncrementalTaskPositionManager * Refactor CDCJobAPI * Add DialectDatabaseMetaData.isSupportGlobalCSN() * Add DialectDatabaseMetaData.isSupportGlobalCSN() --- .../metadata/database/DialectDatabaseMetaData.java | 11 ++++++++++- .../metadata/database/OpenGaussDatabaseMetaData.java | 5 +++++ .../data/pipeline/cdc/api/CDCJobAPI.java | 4 ++-- .../pipeline/cdc/core/prepare/CDCJobPreparer.java | 12 ++++-------- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java index f9a7457c93c56..4ec732cd5a7c5 100644 --- a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java +++ b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java @@ -17,9 +17,9 @@ package org.apache.shardingsphere.infra.database.core.metadata.database; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; +import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import java.sql.Connection; @@ -129,4 +129,13 @@ default boolean isInstanceConnectionAvailable() { default boolean isSupportThreeTierStorageStructure() { return false; } + + /** + * Is support global CSN. + * + * @return support or not + */ + default boolean isSupportGlobalCSN() { + return false; + } } diff --git a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java index 78ec5a38a73eb..5086260f9b7c3 100644 --- a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java +++ b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java @@ -70,6 +70,11 @@ public Optional getDefaultSchema() { return Optional.of("public"); } + @Override + public boolean isSupportGlobalCSN() { + return true; + } + @Override public String getDatabaseType() { return "openGauss"; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 59b51ab86c9f9..29d67efe7f822 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -201,8 +201,8 @@ private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobC TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); - IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new IncrementalTaskPositionManager( - incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null, incrementalDumperContext, dataSourceManager)); + IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()); + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 51f717ec8bfb2..c527f4f9312e7 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -46,9 +46,9 @@ import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; +import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType; import java.sql.SQLException; import java.util.Collection; @@ -127,7 +127,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader()); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, jobItemContext.getSink(), - needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())), + needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()), importerConfig.getRateLimitAlgorithm()); jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); @@ -138,12 +138,8 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis); } - private boolean needSorting(final boolean hasGlobalCSN) { - return hasGlobalCSN; - } - - private boolean hasGlobalCSN(final DatabaseType databaseType) { - return databaseType instanceof OpenGaussDatabaseType; + private boolean needSorting(final DatabaseType databaseType) { + return DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, databaseType).isSupportGlobalCSN(); } private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List channelProgressPairs) {