Skip to content

Commit

Permalink
Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478)
Browse files Browse the repository at this point in the history
* Rename DataSourceCheckEngine.checkSourceDataSources()

* Refactor DataSourceCheckEngine

* Rename IncrementalTaskPreparer

* Rename IncrementalTaskPositionManager

* Rename IncrementalTaskPositionManager

* Refactor CDCJobAPI

* Add DialectDatabaseMetaData.isSupportGlobalCSN()

* Add DialectDatabaseMetaData.isSupportGlobalCSN()
  • Loading branch information
terrymanu authored Dec 20, 2023
1 parent acca6b7 commit 8923354
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.shardingsphere.infra.database.core.metadata.database;

import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType;
import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;

import java.sql.Connection;
Expand Down Expand Up @@ -129,4 +129,13 @@ default boolean isInstanceConnectionAvailable() {
default boolean isSupportThreeTierStorageStructure() {
return false;
}

/**
* Is support global CSN.
*
* @return support or not
*/
default boolean isSupportGlobalCSN() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public Optional<String> getDefaultSchema() {
return Optional.of("public");
}

@Override
public boolean isSupportGlobalCSN() {
return true;
}

@Override
public String getDatabaseType() {
return "openGauss";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobC
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(new IncrementalTaskPositionManager(
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null, incrementalDumperContext, dataSourceManager));
IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;

import java.sql.SQLException;
import java.util.Collection;
Expand Down Expand Up @@ -127,7 +127,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, jobItemContext.getSink(),
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
Expand All @@ -138,12 +138,8 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
}

private boolean needSorting(final boolean hasGlobalCSN) {
return hasGlobalCSN;
}

private boolean hasGlobalCSN(final DatabaseType databaseType) {
return databaseType instanceof OpenGaussDatabaseType;
private boolean needSorting(final DatabaseType databaseType) {
return DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, databaseType).isSupportGlobalCSN();
}

private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
Expand Down

0 comments on commit 8923354

Please sign in to comment.