Skip to content

Commit

Permalink
Refactor InventoryDumperContextSplitter (apache#32596)
Browse files Browse the repository at this point in the history
* Refactor PipelineTaskUtils.generateInventoryTaskId

* Refactor PipelineTaskUtils.generateInventoryTaskId

* Refactor PipelineTaskUtils.generateInventoryTaskId

* Refactor PipelineTaskUtils.generateInventoryTaskId

* Refactor InventoryTask

* Fix javadoc of InventoryDumperContextSplitter

* Refactor InventoryDumperContextSplitter

* Refactor InventoryDumperContextSplitter
  • Loading branch information
terrymanu authored Aug 19, 2024
1 parent a51323f commit 7a2b920
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Range;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
Expand All @@ -31,14 +30,14 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;

import javax.sql.DataSource;
import java.sql.Connection;
Expand All @@ -52,10 +51,9 @@
import java.util.stream.Collectors;

/**
* Inventory task splitter.
* Inventory dumper context splitter.
*/
@RequiredArgsConstructor
@Slf4j
public final class InventoryDumperContextSplitter {

private final PipelineDataSourceWrapper sourceDataSource;
Expand All @@ -69,56 +67,46 @@ public final class InventoryDumperContextSplitter {
* @return inventory dumper contexts
*/
public Collection<InventoryDumperContext> split(final TransmissionJobItemContext jobItemContext) {
return splitByTable(dumperContext).stream().flatMap(each -> splitByPrimaryKey(each, jobItemContext, sourceDataSource).stream()).collect(Collectors.toList());
return splitByTable().stream().flatMap(each -> splitByPrimaryKey(each, jobItemContext).stream()).collect(Collectors.toList());
}

private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.toString());
inventoryDumperContext.setLogicTableName(value.toString());
inventoryDumperContext.getCommonContext().setPosition(new IngestPlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
});
private Collection<InventoryDumperContext> splitByTable() {
return dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet()
.stream().map(entry -> createTableSpLitDumperContext(entry.getKey(), entry.getValue())).collect(Collectors.toList());
}

private InventoryDumperContext createTableSpLitDumperContext(final CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier logicTableName) {
InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
result.setActualTableName(actualTableName.toString());
result.setLogicTableName(logicTableName.toString());
result.getCommonContext().setPosition(new IngestPlaceholderPosition());
result.setInsertColumnNames(dumperContext.getInsertColumnNames());
result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
return result;
}

private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) {
if (null == dumperContext.getUniqueKeyColumns()) {
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
dumperContext.setUniqueKeyColumns(getTableUniqueKeys(dumperContext, jobItemContext));
}
Collection<InventoryDumperContext> result = new LinkedList<>();
TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfiguration().getRead();
int batchSize = readConfig.getBatchSize();
int batchSize = jobProcessContext.getProcessConfiguration().getRead().getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
splitDumperContext.setBatchSize(batchSize);
splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm);
result.add(splitDumperContext);
for (IngestPosition each : getInventoryPositions(dumperContext, jobItemContext)) {
result.add(createPrimaryKeySplitDumperContext(dumperContext, each, i++, batchSize, rateLimitAlgorithm));
}
return result;
}

private Collection<IngestPosition> getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
private List<PipelineColumnMetaData> getTableUniqueKeys(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) {
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader());
}

private Collection<IngestPosition> getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext) {
TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress();
if (null != initProgress) {
// Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
Expand All @@ -127,7 +115,7 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
return result;
}
}
long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, sourceDataSource);
jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
if (!dumperContext.hasUniqueKey()) {
return Collections.singleton(new UnsupportedKeyIngestPosition());
Expand All @@ -136,7 +124,7 @@ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperCo
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, dataSource);
return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, sourceDataSource);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
Expand Down Expand Up @@ -179,4 +167,18 @@ private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext job
throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex);
}
}

private InventoryDumperContext createPrimaryKeySplitDumperContext(final InventoryDumperContext dumperContext, final IngestPosition position,
final int shardingItem, final int batchSize, final JobRateLimitAlgorithm rateLimitAlgorithm) {
InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext());
result.getCommonContext().setPosition(position);
result.setShardingItem(shardingItem);
result.setActualTableName(dumperContext.getActualTableName());
result.setLogicTableName(dumperContext.getLogicTableName());
result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.setInsertColumnNames(dumperContext.getInsertColumnNames());
result.setBatchSize(batchSize);
result.setRateLimitAlgorithm(rateLimitAlgorithm);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public List<InventoryTask> split(final TransmissionJobItemContext jobItemContext
result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
}
log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
log.info("Split inventory tasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Inventory task.
*/
@RequiredArgsConstructor
@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"})
@ToString(of = {"taskId", "position"})
public final class InventoryTask implements PipelineTask {

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
public final class PipelineTaskUtils {

/**
* Generate inventory task id.
* Generate inventory task ID.
*
* @param inventoryDumperContext inventory dumper context
* @return inventory task id
* @param dumperContext inventory dumper context
* @return generated ID
*/
public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) {
String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName());
return result + "#" + inventoryDumperContext.getShardingItem();
public static String generateInventoryTaskId(final InventoryDumperContext dumperContext) {
return String.format("%s.%s#%s", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName(), dumperContext.getShardingItem());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.task;

import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class PipelineTaskUtilsTest {

@Test
void assertGenerateInventoryTaskId() {
InventoryDumperContext dumperContext = new InventoryDumperContext(new DumperCommonContext("foo_ds", null, null, null));
dumperContext.setActualTableName("foo_actual_tbl");
dumperContext.setShardingItem(1);
assertThat(PipelineTaskUtils.generateInventoryTaskId(dumperContext), is("foo_ds.foo_actual_tbl#1"));
}
}

0 comments on commit 7a2b920

Please sign in to comment.