Skip to content

Commit

Permalink
Refactor DumperConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 2, 2023
1 parent 0de4f9b commit ee1c67b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config.ingest;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Base dumper configuration.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
public abstract class BaseDumperConfiguration {

private String dataSourceName;

private PipelineDataSourceConfiguration dataSourceConfig;

private IngestPosition position;

private Map<ActualTableName, LogicTableName> tableNameMap;

private TableNameSchemaNameMapping tableNameSchemaNameMapping;

// LinkedHashSet is required
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap = new HashMap<>();

/**
* Get logic table name.
*
* @param actualTableName actual table name
* @return logic table name
*/
public LogicTableName getLogicTableName(final String actualTableName) {
return tableNameMap.get(new ActualTableName(actualTableName));
}

private LogicTableName getLogicTableName(final ActualTableName actualTableName) {
return tableNameMap.get(actualTableName);
}

/**
* Whether contains table.
*
* @param actualTableName actual table name
* @return contains or not
*/
public boolean containsTable(final String actualTableName) {
return tableNameMap.containsKey(new ActualTableName(actualTableName));
}

/**
* Get schema name.
*
* @param logicTableName logic table name
* @return schema name. nullable
*/
public String getSchemaName(final LogicTableName logicTableName) {
return tableNameSchemaNameMapping.getSchemaName(logicTableName);
}

/**
* Get schema name.
*
* @param actualTableName actual table name
* @return schema name. nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
}

/**
* Get column names.
*
* @param logicTableName logic table name
* @return column names
*/
public Collection<String> getColumnNames(final LogicTableName logicTableName) {
return targetTableColumnsMap.containsKey(logicTableName)
? targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
: Collections.singleton("*");
}

/**
* Get column names.
*
* @param actualTableName actual table name
* @return column names
*/
public Collection<ColumnName> getColumnNames(final String actualTableName) {
return targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,114 +17,19 @@

package org.apache.shardingsphere.data.pipeline.api.config.ingest;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Dumper configuration.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
// TODO it should be final and not extends by sub-class
// TODO fields final
public class DumperConfiguration {
@ToString(callSuper = true)
public class DumperConfiguration extends BaseDumperConfiguration {

private String jobId;

private String dataSourceName;

private PipelineDataSourceConfiguration dataSourceConfig;

private IngestPosition position;

private Map<ActualTableName, LogicTableName> tableNameMap;

private TableNameSchemaNameMapping tableNameSchemaNameMapping;

// LinkedHashSet is required
@Getter(AccessLevel.PROTECTED)
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap = new HashMap<>();

private boolean decodeWithTX;

/**
* Get logic table name.
*
* @param actualTableName actual table name
* @return logic table name
*/
public LogicTableName getLogicTableName(final String actualTableName) {
return tableNameMap.get(new ActualTableName(actualTableName));
}

private LogicTableName getLogicTableName(final ActualTableName actualTableName) {
return tableNameMap.get(actualTableName);
}

/**
* Whether contains table.
*
* @param actualTableName actual table name
* @return contains or not
*/
public boolean containsTable(final String actualTableName) {
return tableNameMap.containsKey(new ActualTableName(actualTableName));
}

/**
* Get schema name.
*
* @param logicTableName logic table name
* @return schema name. nullable
*/
public String getSchemaName(final LogicTableName logicTableName) {
return tableNameSchemaNameMapping.getSchemaName(logicTableName);
}

/**
* Get schema name.
*
* @param actualTableName actual table name
* @return schema name. nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
}

/**
* Get column names.
*
* @param logicTableName logic table name
* @return column names
*/
public Collection<String> getColumnNames(final LogicTableName logicTableName) {
return targetTableColumnsMap.containsKey(logicTableName)
? targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
: Collections.singleton("*");
}

/**
* Get column names.
*
* @param actualTableName actual table name
* @return column names
*/
public Collection<ColumnName> getColumnNames(final String actualTableName) {
return targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
@Getter
@Setter
@ToString(callSuper = true)
// TODO fields final
public final class InventoryDumperConfiguration extends DumperConfiguration {
public final class InventoryDumperConfiguration extends BaseDumperConfiguration {

private String actualTableName;

Expand All @@ -52,7 +51,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperConfiguration(final DumperConfiguration dumperConfig) {
public InventoryDumperConfiguration(final BaseDumperConfiguration dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.test.it.data.pipeline.core.prepare;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.BaseDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
Expand Down Expand Up @@ -143,7 +143,7 @@ void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException
}
}

private void initEmptyTablePrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initEmptyTablePrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand All @@ -153,7 +153,7 @@ private void initEmptyTablePrimaryEnvironment(final DumperConfiguration dumperCo
}
}

private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initIntPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand All @@ -166,7 +166,7 @@ private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) t
}
}

private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initCharPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand All @@ -177,7 +177,7 @@ private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig)
}
}

private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initUnionPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand All @@ -188,7 +188,7 @@ private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig)
}
}

private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initNoPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand All @@ -199,7 +199,7 @@ private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) th
}
}

private void initUniqueIndexOnNotNullColumnEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
private void initUniqueIndexOnNotNullColumnEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Expand Down

0 comments on commit ee1c67b

Please sign in to comment.