From 7a2b92042a0b62acdc66ba3ef30ed4547609484f Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Mon, 19 Aug 2024 21:21:21 +0800 Subject: [PATCH] Refactor InventoryDumperContextSplitter (#32596) * Refactor PipelineTaskUtils.generateInventoryTaskId * Refactor PipelineTaskUtils.generateInventoryTaskId * Refactor PipelineTaskUtils.generateInventoryTaskId * Refactor PipelineTaskUtils.generateInventoryTaskId * Refactor InventoryTask * Fix javadoc of InventoryDumperContextSplitter * Refactor InventoryDumperContextSplitter * Refactor InventoryDumperContextSplitter --- .../InventoryDumperContextSplitter.java | 84 ++++++++++--------- .../splitter/InventoryTaskSplitter.java | 2 +- .../pipeline/core/task/InventoryTask.java | 2 +- .../pipeline/core/task/PipelineTaskUtils.java | 11 ++- .../core/task/PipelineTaskUtilsTest.java | 36 ++++++++ 5 files changed, 86 insertions(+), 49 deletions(-) create mode 100644 kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 4ccdfd82b6010..6a68d80d70e3c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Range; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext; @@ -31,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator; @@ -39,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator; import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import javax.sql.DataSource; import java.sql.Connection; @@ -52,10 +51,9 @@ import java.util.stream.Collectors; /** - * Inventory task splitter. + * Inventory dumper context splitter. */ @RequiredArgsConstructor -@Slf4j public final class InventoryDumperContextSplitter { private final PipelineDataSourceWrapper sourceDataSource; @@ -69,56 +67,46 @@ public final class InventoryDumperContextSplitter { * @return inventory dumper contexts */ public Collection split(final TransmissionJobItemContext jobItemContext) { - return splitByTable(dumperContext).stream().flatMap(each -> splitByPrimaryKey(each, jobItemContext, sourceDataSource).stream()).collect(Collectors.toList()); + return splitByTable().stream().flatMap(each -> splitByPrimaryKey(each, jobItemContext).stream()).collect(Collectors.toList()); } - private Collection splitByTable(final InventoryDumperContext dumperContext) { - Collection result = new LinkedList<>(); - dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> { - InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); - // use original table name, for metadata loader, since some database table name case-sensitive - inventoryDumperContext.setActualTableName(key.toString()); - inventoryDumperContext.setLogicTableName(value.toString()); - inventoryDumperContext.getCommonContext().setPosition(new IngestPlaceholderPosition()); - inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); - inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); - result.add(inventoryDumperContext); - }); + private Collection splitByTable() { + return dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet() + .stream().map(entry -> createTableSpLitDumperContext(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + } + + private InventoryDumperContext createTableSpLitDumperContext(final CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier logicTableName) { + InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext()); + // use original table name, for metadata loader, since some database table name case-sensitive + result.setActualTableName(actualTableName.toString()); + result.setLogicTableName(logicTableName.toString()); + result.getCommonContext().setPosition(new IngestPlaceholderPosition()); + result.setInsertColumnNames(dumperContext.getInsertColumnNames()); + result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); return result; } - private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext, - final PipelineDataSourceWrapper dataSource) { + private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) { if (null == dumperContext.getUniqueKeyColumns()) { - String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); - String actualTableName = dumperContext.getActualTableName(); - List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); - dumperContext.setUniqueKeyColumns(uniqueKeyColumns); + dumperContext.setUniqueKeyColumns(getTableUniqueKeys(dumperContext, jobItemContext)); } Collection result = new LinkedList<>(); TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext(); - PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfiguration().getRead(); - int batchSize = readConfig.getBatchSize(); + int batchSize = jobProcessContext.getProcessConfiguration().getRead().getBatchSize(); JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm(); - Collection inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource); int i = 0; - for (IngestPosition each : inventoryPositions) { - InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); - splitDumperContext.getCommonContext().setPosition(each); - splitDumperContext.setShardingItem(i++); - splitDumperContext.setActualTableName(dumperContext.getActualTableName()); - splitDumperContext.setLogicTableName(dumperContext.getLogicTableName()); - splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); - splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); - splitDumperContext.setBatchSize(batchSize); - splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm); - result.add(splitDumperContext); + for (IngestPosition each : getInventoryPositions(dumperContext, jobItemContext)) { + result.add(createPrimaryKeySplitDumperContext(dumperContext, each, i++, batchSize, rateLimitAlgorithm)); } return result; } - private Collection getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext, - final PipelineDataSourceWrapper dataSource) { + private List getTableUniqueKeys(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) { + String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); + return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader()); + } + + private Collection getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) { TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress(); if (null != initProgress) { // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center. @@ -127,7 +115,7 @@ private Collection getInventoryPositions(final InventoryDumperCo return result; } } - long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource); + long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, sourceDataSource); jobItemContext.updateInventoryRecordsCount(tableRecordsCount); if (!dumperContext.hasUniqueKey()) { return Collections.singleton(new UnsupportedKeyIngestPosition()); @@ -136,7 +124,7 @@ private Collection getInventoryPositions(final InventoryDumperCo if (1 == uniqueKeyColumns.size()) { int firstColumnDataType = uniqueKeyColumns.get(0).getDataType(); if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) { - return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, dataSource); + return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, sourceDataSource); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. @@ -179,4 +167,18 @@ private Range getUniqueKeyValuesRange(final TransmissionJobItemContext job throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex); } } + + private InventoryDumperContext createPrimaryKeySplitDumperContext(final InventoryDumperContext dumperContext, final IngestPosition position, + final int shardingItem, final int batchSize, final JobRateLimitAlgorithm rateLimitAlgorithm) { + InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext()); + result.getCommonContext().setPosition(position); + result.setShardingItem(shardingItem); + result.setActualTableName(dumperContext.getActualTableName()); + result.setLogicTableName(dumperContext.getLogicTableName()); + result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); + result.setInsertColumnNames(dumperContext.getInsertColumnNames()); + result.setBatchSize(batchSize); + result.setRateLimitAlgorithm(rateLimitAlgorithm); + return result; + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java index e3db30855f4dc..034081e52031a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java @@ -70,7 +70,7 @@ public List split(final TransmissionJobItemContext jobItemContext result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); } - log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis); + log.info("Split inventory tasks cost {} ms", System.currentTimeMillis() - startTimeMillis); return result; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java index 19d88aea7a94e..afe20fc1e3dd3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java @@ -35,7 +35,7 @@ * Inventory task. */ @RequiredArgsConstructor -@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"}) +@ToString(of = {"taskId", "position"}) public final class InventoryTask implements PipelineTask { @Getter diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index a29460f6fef61..a4ae090d78683 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -33,14 +33,13 @@ public final class PipelineTaskUtils { /** - * Generate inventory task id. + * Generate inventory task ID. * - * @param inventoryDumperContext inventory dumper context - * @return inventory task id + * @param dumperContext inventory dumper context + * @return generated ID */ - public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) { - String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName()); - return result + "#" + inventoryDumperContext.getShardingItem(); + public static String generateInventoryTaskId(final InventoryDumperContext dumperContext) { + return String.format("%s.%s#%s", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName(), dumperContext.getShardingItem()); } /** diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java new file mode 100644 index 0000000000000..903e99ec83a66 --- /dev/null +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.task; + +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class PipelineTaskUtilsTest { + + @Test + void assertGenerateInventoryTaskId() { + InventoryDumperContext dumperContext = new InventoryDumperContext(new DumperCommonContext("foo_ds", null, null, null)); + dumperContext.setActualTableName("foo_actual_tbl"); + dumperContext.setShardingItem(1); + assertThat(PipelineTaskUtils.generateInventoryTaskId(dumperContext), is("foo_ds.foo_actual_tbl#1")); + } +}