Skip to content

Commit

Permalink
datavane/tis#192 add boolean inSink param on getColmeta form differe…
Browse files Browse the repository at this point in the history
…nt process on Source or Sink end for oracle Date type
  • Loading branch information
baisui1981 committed Apr 18, 2023
1 parent e480e37 commit dd62472
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void startRead(Configuration readerSliceConfig,
Map<String, ColumnMetaData> tabCols = null;
try {
tabCols = ColumnMetaData.toMap(this.readerDataSourceFactoryGetter.getDataSourceFactory()
.getTableMetadata(new DataSourceMeta.JDBCConnection(conn, jdbcUrl), EntityName.parse(table, true)));
.getTableMetadata(new DataSourceMeta.JDBCConnection(conn, jdbcUrl), false, EntityName.parse(table, true)));
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,18 +523,18 @@ public static void closeDBResources(Statement stmt, Connection conn) {
// return getTableColumnsByConn(dataBaseType, conn, tableName, "jdbcUrl:" + jdbcUrl);
// }

private static List<String> getTableColums(IDataSourceFactoryGetter dataSourceFactoryGetter, SelectTable tableName) {
private static List<String> getTableColums(IDataSourceFactoryGetter dataSourceFactoryGetter, boolean inSink, SelectTable tableName) {
try {
List<ColumnMetaData> tabMeta = dataSourceFactoryGetter
.getDataSourceFactory().getTableMetadata(EntityName.parse(tableName.getUnescapeTabName()));
.getDataSourceFactory().getTableMetadata(inSink, EntityName.parse(tableName.getUnescapeTabName()));
return tabMeta.stream().map((c) -> c.getName()).collect(Collectors.toList());
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
}

public static List<String> getTableColumnsByConn(DataBaseType dataBaseType, IDataSourceFactoryGetter conn, SelectTable tableName, String basicMsg) {
return getTableColums(conn, tableName);
public static List<String> getTableColumnsByConn(DataBaseType dataBaseType, boolean inSink, IDataSourceFactoryGetter conn, SelectTable tableName, String basicMsg) {
return getTableColums(conn, inSink, tableName);
// List<String> columns = new ArrayList<String>();
// Statement statement = null;
// ResultSet rs = null;
Expand Down Expand Up @@ -578,22 +578,22 @@ public static List<String> getTableColumnsByConn(DataBaseType dataBaseType, IDat
// //}
// }
public static List<ColumnMetaData> getColumnMetaData(
IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) {
return getColumnMetaData(Optional.empty(), dsGetter, tableName, userConfiguredColumns);
IDataSourceFactoryGetter dsGetter, boolean inSink, SelectTable tableName, SelectCols userConfiguredColumns) {
return getColumnMetaData(Optional.empty(), inSink, dsGetter, tableName, userConfiguredColumns);
}

/**
* @return Left:ColumnName Middle:ColumnType Right:ColumnTypeName
*/
public static List<ColumnMetaData> getColumnMetaData(
Optional<DataSourceMeta.JDBCConnection> connection, IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) {
Optional<DataSourceMeta.JDBCConnection> connection, boolean inSink, IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) {

Map<String, ColumnMetaData> colMapper = null;
try {
EntityName table = EntityName.parse(tableName.getUnescapeTabName());
List<ColumnMetaData> tabCols = connection.isPresent()
? dsGetter.getDataSourceFactory().getTableMetadata((connection.get()), table)
: dsGetter.getDataSourceFactory().getTableMetadata(table);
? dsGetter.getDataSourceFactory().getTableMetadata((connection.get()), inSink, table)
: dsGetter.getDataSourceFactory().getTableMetadata(inSink, table);
colMapper = tabCols.stream().collect(Collectors.toMap((c) -> c.getName(), (c) -> c));
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCo
// this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
// this.table, StringUtils.join(this.columns, ","));

this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), this.dataSourceFactoryGetter, this.table, columns)
this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), true, this.dataSourceFactoryGetter, this.table, columns)
.stream().map((c) -> Pair.of(c, parseColSetter(c))).collect(Collectors.toList());


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public static void dealColumnConf(Configuration originalConfig, ConnectionFactor
boolean isPreCheck = originalConfig.getBool(Key.DRYRUN, false);
List<String> allColumns;
if (isPreCheck) {
allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo());
allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, true, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo());
} else {
allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo());
allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, true, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo());
}

LOG.info("table:[{}] all columns:[\n{}\n].", oneTable,
Expand All @@ -126,7 +126,7 @@ public static void dealColumnConf(Configuration originalConfig, ConnectionFactor

// 检查列是否都为数据库表中正确的列(通过执行一次 select column from table 进行判断)
String cfgCols = StringUtils.join(userConfiguredColumns, ",");
List<ColumnMetaData> cols = DBUtil.getColumnMetaData(connectionFactory.getConnecttionWithoutRetry(), oneTable, userConfiguredColumns);
List<ColumnMetaData> cols = DBUtil.getColumnMetaData(connectionFactory.getConnecttionWithoutRetry(), true, oneTable, userConfiguredColumns);

if (cols.size() != userConfiguredColumns.size()) {
List<String> existCols = cols.stream().map((c) -> c.getName()).collect(Collectors.toList());
Expand All @@ -143,7 +143,7 @@ public static void dealColumnConf(IDataSourceFactoryGetter dataSourceFactoryGett

String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
SelectTable oneTable = SelectTable.create(originalConfig,dataSourceFactoryGetter.getDBReservedKeys());
SelectTable oneTable = SelectTable.create(originalConfig, dataSourceFactoryGetter.getDBReservedKeys());
// .getString(String.format(
// "%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE));

Expand Down

0 comments on commit dd62472

Please sign in to comment.