Skip to content

Commit

Permalink
Refactor InventoryDumper (apache#32644)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Aug 23, 2024
1 parent 0af0b6b commit f3fb901
Showing 1 changed file with 87 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,60 +132,6 @@ private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
return position instanceof PrimaryKeyIngestPosition && null == ((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null == ((PrimaryKeyIngestPosition<?>) position).getEndValue();
}

@SuppressWarnings("MagicConstant")
private void dumpWithStreamingQuery(final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQLWithStreamingQuery())) {
runningStatement.set(preparedStatement);
if (!DatabaseTypeUtils.isMySQL(databaseType)) {
preparedStatement.setFetchSize(batchSize);
}
PrimaryKeyIngestPosition<?> primaryPosition = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(new QueryRange(primaryPosition.getBeginValue(), true, primaryPosition.getEndValue()));
setParameters(preparedStatement, queryParam, true);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
int rowCount = 0;
JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
if (dataRecords.size() >= batchSize) {
channel.push(dataRecords);
dataRecords = new LinkedList<>();
}
dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
break;
}
if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
}
}
dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
channel.push(dataRecords);
log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}",
rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
} finally {
runningStatement.set(null);
}
}
}

private String buildInventoryDumpSQLWithStreamingQuery() {
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
List<String> columnNames = getQueryColumnNames();
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
}

@SuppressWarnings("MagicConstant")
private void dumpPageByPage(final Connection connection) throws SQLException {
if (null != dumperContext.getTransactionIsolation()) {
Expand All @@ -195,20 +141,20 @@ private void dumpPageByPage(final Connection connection) throws SQLException {
AtomicLong rowCount = new AtomicLong();
IngestPosition position = dumperContext.getCommonContext().getPosition();
while (true) {
PrimaryKeyIngestPosition<?> primaryPosition = (PrimaryKeyIngestPosition<?>) position;
InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(new QueryRange(primaryPosition.getBeginValue(), firstQuery, primaryPosition.getEndValue()));
List<Record> dataRecords = dumpPageByPage0(connection, queryParam, rowCount);
QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(queryRange);
List<Record> dataRecords = dumpPageByPage(connection, queryParam, rowCount);
if (dataRecords.size() > 1 && Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
queryParam = InventoryQueryParameter.buildForPointQuery(getFirstUniqueKeyValue(dataRecords, 0));
dataRecords = dumpPageByPage0(connection, queryParam, rowCount);
dataRecords = dumpPageByPage(connection, queryParam, rowCount);
}
firstQuery = false;
if (dataRecords.isEmpty()) {
position = new IngestFinishedPosition();
dataRecords.add(new FinishedRecord(position));
log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
} else {
position = PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1), primaryPosition.getEndValue());
position = PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1), queryRange.getUpper());
}
channel.push(dataRecords);
dumperContext.getCommonContext().setPosition(position);
Expand All @@ -218,11 +164,7 @@ private void dumpPageByPage(final Connection connection) throws SQLException {
}
}

private Object getFirstUniqueKeyValue(final List<Record> dataRecords, final int index) {
return ((DataRecord) dataRecords.get(index)).getUniqueKeyValue().iterator().next();
}

private List<Record> dumpPageByPage0(final Connection connection, final InventoryQueryParameter queryParam, final AtomicLong rowCount) throws SQLException {
private List<Record> dumpPageByPage(final Connection connection, final InventoryQueryParameter queryParam, final AtomicLong rowCount) throws SQLException {
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpPageByPageSQL(queryParam))) {
Expand Down Expand Up @@ -259,28 +201,6 @@ private List<Record> dumpPageByPage0(final Connection connection, final Inventor
}
}

private String buildInventoryDumpPageByPageSQL(final InventoryQueryParameter queryParam) {
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
List<String> columnNames = getQueryColumnNames();
if (QueryType.POINT_QUERY == queryParam.getQueryType()) {
return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
QueryRange queryRange = queryParam.getUniqueKeyValueRange();
boolean lowerInclusive = queryRange.isLowerInclusive();
if (null != queryRange.getLower() && null != queryRange.getUpper()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true);
}
if (null != queryRange.getLower()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false);
}
throw new PipelineInternalException("Primary key position is invalid.");
}

private List<String> getQueryColumnNames() {
return Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.singletonList("*"));
}

private void setParameters(final PreparedStatement preparedStatement, final InventoryQueryParameter queryParam, final boolean streamingQuery) throws SQLException {
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
Expand Down Expand Up @@ -320,8 +240,7 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta
for (int i = 1; i <= columnCount; i++) {
String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName)));
boolean isUniqueKey = tableMetaData.getColumnMetaData(columnName).isUniqueKey();
result.addColumn(new Column(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, isUniqueKey));
result.addColumn(new Column(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
}
result.setActualTableName(dumperContext.getActualTableName());
return result;
Expand All @@ -334,6 +253,86 @@ protected IngestPosition newDataRecordPosition(final ResultSet resultSet) throws
: new IngestPlaceholderPosition();
}

private String buildInventoryDumpPageByPageSQL(final InventoryQueryParameter queryParam) {
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
List<String> columnNames = getQueryColumnNames();
if (QueryType.POINT_QUERY == queryParam.getQueryType()) {
return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
QueryRange queryRange = queryParam.getUniqueKeyValueRange();
boolean lowerInclusive = queryRange.isLowerInclusive();
if (null != queryRange.getLower() && null != queryRange.getUpper()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true);
}
if (null != queryRange.getLower()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false);
}
throw new PipelineInternalException("Primary key position is invalid.");
}

private List<String> getQueryColumnNames() {
return Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.singletonList("*"));
}

private Object getFirstUniqueKeyValue(final List<Record> dataRecords, final int index) {
return ((DataRecord) dataRecords.get(index)).getUniqueKeyValue().iterator().next();
}

@SuppressWarnings("MagicConstant")
private void dumpWithStreamingQuery(final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQLWithStreamingQuery())) {
runningStatement.set(preparedStatement);
if (!DatabaseTypeUtils.isMySQL(databaseType)) {
preparedStatement.setFetchSize(batchSize);
}
PrimaryKeyIngestPosition<?> primaryPosition = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(new QueryRange(primaryPosition.getBeginValue(), true, primaryPosition.getEndValue()));
setParameters(preparedStatement, queryParam, true);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
int rowCount = 0;
JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
if (dataRecords.size() >= batchSize) {
channel.push(dataRecords);
dataRecords = new LinkedList<>();
}
dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
break;
}
if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
}
}
dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
channel.push(dataRecords);
log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}",
rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
} finally {
runningStatement.set(null);
}
}
}

private String buildInventoryDumpSQLWithStreamingQuery() {
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
List<String> columnNames = getQueryColumnNames();
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
}

@Override
protected void doStop() {
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
Expand Down

0 comments on commit f3fb901

Please sign in to comment.