From dd6247213f7a858fff399f80bc412bb8299746a7 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Tue, 18 Apr 2023 17:21:35 +0800 Subject: [PATCH] https://github.com/qlangtech/tis/issues/192 add boolean inSink param on getColmeta form different process on Source or Sink end for oracle Date type --- .../plugin/rdbms/reader/CommonRdbmsReader.java | 2 +- .../datax/plugin/rdbms/util/DBUtil.java | 18 +++++++++--------- .../plugin/rdbms/writer/CommonRdbmsWriter.java | 2 +- .../util/OriginalConfPretreatmentUtil.java | 8 ++++---- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index 057c7514bc..550bd21bae 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -214,7 +214,7 @@ public void startRead(Configuration readerSliceConfig, Map tabCols = null; try { tabCols = ColumnMetaData.toMap(this.readerDataSourceFactoryGetter.getDataSourceFactory() - .getTableMetadata(new DataSourceMeta.JDBCConnection(conn, jdbcUrl), EntityName.parse(table, true))); + .getTableMetadata(new DataSourceMeta.JDBCConnection(conn, jdbcUrl), false, EntityName.parse(table, true))); } catch (TableNotFoundException e) { throw new RuntimeException(e); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java index f374699678..d75de4d896 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java @@ -523,18 +523,18 @@ public static void closeDBResources(Statement stmt, Connection conn) { // return getTableColumnsByConn(dataBaseType, conn, tableName, "jdbcUrl:" + jdbcUrl); // } - private static List getTableColums(IDataSourceFactoryGetter dataSourceFactoryGetter, SelectTable tableName) { + private static List getTableColums(IDataSourceFactoryGetter dataSourceFactoryGetter, boolean inSink, SelectTable tableName) { try { List tabMeta = dataSourceFactoryGetter - .getDataSourceFactory().getTableMetadata(EntityName.parse(tableName.getUnescapeTabName())); + .getDataSourceFactory().getTableMetadata(inSink, EntityName.parse(tableName.getUnescapeTabName())); return tabMeta.stream().map((c) -> c.getName()).collect(Collectors.toList()); } catch (TableNotFoundException e) { throw new RuntimeException(e); } } - public static List getTableColumnsByConn(DataBaseType dataBaseType, IDataSourceFactoryGetter conn, SelectTable tableName, String basicMsg) { - return getTableColums(conn, tableName); + public static List getTableColumnsByConn(DataBaseType dataBaseType, boolean inSink, IDataSourceFactoryGetter conn, SelectTable tableName, String basicMsg) { + return getTableColums(conn, inSink, tableName); // List columns = new ArrayList(); // Statement statement = null; // ResultSet rs = null; @@ -578,22 +578,22 @@ public static List getTableColumnsByConn(DataBaseType dataBaseType, IDat // //} // } public static List getColumnMetaData( - IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) { - return getColumnMetaData(Optional.empty(), dsGetter, tableName, userConfiguredColumns); + IDataSourceFactoryGetter dsGetter, boolean inSink, SelectTable tableName, SelectCols userConfiguredColumns) { + return getColumnMetaData(Optional.empty(), inSink, dsGetter, tableName, userConfiguredColumns); } /** * @return Left:ColumnName Middle:ColumnType Right:ColumnTypeName */ public static List getColumnMetaData( - Optional connection, IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) { + Optional connection, boolean inSink, IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) { Map colMapper = null; try { EntityName table = EntityName.parse(tableName.getUnescapeTabName()); List tabCols = connection.isPresent() - ? dsGetter.getDataSourceFactory().getTableMetadata((connection.get()), table) - : dsGetter.getDataSourceFactory().getTableMetadata(table); + ? dsGetter.getDataSourceFactory().getTableMetadata((connection.get()), inSink, table) + : dsGetter.getDataSourceFactory().getTableMetadata(inSink, table); colMapper = tabCols.stream().collect(Collectors.toMap((c) -> c.getName(), (c) -> c)); } catch (TableNotFoundException e) { throw new RuntimeException(e); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index dc5353f7e9..6c9a0f1fe1 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -452,7 +452,7 @@ public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCo // this.resultSetMetaData = DBUtil.getColumnMetaData(connection, // this.table, StringUtils.join(this.columns, ",")); - this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), this.dataSourceFactoryGetter, this.table, columns) + this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), true, this.dataSourceFactoryGetter, this.table, columns) .stream().map((c) -> Pair.of(c, parseColSetter(c))).collect(Collectors.toList()); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java index c8ee167397..133bb08d2e 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -103,9 +103,9 @@ public static void dealColumnConf(Configuration originalConfig, ConnectionFactor boolean isPreCheck = originalConfig.getBool(Key.DRYRUN, false); List allColumns; if (isPreCheck) { - allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo()); + allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, true, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo()); } else { - allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo()); + allColumns = DBUtil.getTableColumnsByConn(DATABASE_TYPE, true, connectionFactory.getConnecttionWithoutRetry(), oneTable, connectionFactory.getConnectionInfo()); } LOG.info("table:[{}] all columns:[\n{}\n].", oneTable, @@ -126,7 +126,7 @@ public static void dealColumnConf(Configuration originalConfig, ConnectionFactor // 检查列是否都为数据库表中正确的列(通过执行一次 select column from table 进行判断) String cfgCols = StringUtils.join(userConfiguredColumns, ","); - List cols = DBUtil.getColumnMetaData(connectionFactory.getConnecttionWithoutRetry(), oneTable, userConfiguredColumns); + List cols = DBUtil.getColumnMetaData(connectionFactory.getConnecttionWithoutRetry(), true, oneTable, userConfiguredColumns); if (cols.size() != userConfiguredColumns.size()) { List existCols = cols.stream().map((c) -> c.getName()).collect(Collectors.toList()); @@ -143,7 +143,7 @@ public static void dealColumnConf(IDataSourceFactoryGetter dataSourceFactoryGett String username = originalConfig.getString(Key.USERNAME); String password = originalConfig.getString(Key.PASSWORD); - SelectTable oneTable = SelectTable.create(originalConfig,dataSourceFactoryGetter.getDBReservedKeys()); + SelectTable oneTable = SelectTable.create(originalConfig, dataSourceFactoryGetter.getDBReservedKeys()); // .getString(String.format( // "%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE));