From 93bc02ec240edebf297672a47a0979e1bd08f853 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Wed, 12 Oct 2022 16:01:42 +0800 Subject: [PATCH] refactor: move dataSource escapeChar from 'CreateTableSqlBuilder' to 'DataSourceMeta' --- .../clickhousewriter/ClickhouseWriter.java | 374 ++++++------- .../rdbms/writer/util/EscapeableEntity.java | 8 + .../plugin/rdbms/writer/util/SelectCols.java | 11 +- .../rdbms/reader/CommonRdbmsReader.java | 10 +- .../datax/plugin/rdbms/util/DBUtil.java | 35 +- .../rdbms/writer/CommonRdbmsWriter.java | 509 ++++++++++++------ .../rdbms/writer/util/IStatementSetter.java | 14 + .../rdbmswriter/SubCommonRdbmsWriter.java | 286 +++++----- 8 files changed, 747 insertions(+), 500 deletions(-) create mode 100644 plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/IStatementSetter.java diff --git a/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java index b4522b7dbc..e051455fc5 100644 --- a/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java +++ b/clickhousewriter/src/main/java/com/alibaba/datax/plugin/writer/clickhousewriter/ClickhouseWriter.java @@ -10,6 +10,7 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.datax.plugin.rdbms.writer.util.IStatementSetter; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.qlangtech.tis.plugin.ds.ColumnMetaData; @@ -64,192 +65,195 @@ public void init() { this.writerSliceConfig = super.getPluginJobConf(); this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) { - @Override - protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, Column column) throws SQLException { - ColumnMetaData cm = this.resultSetMetaData.get(columnIndex); - DataType type = cm.getType(); - try { - if (column.getRawData() == null) { - preparedStatement.setNull(columnIndex + 1, columnSqltype); - return preparedStatement; - } - - java.util.Date utilDate; - switch (columnSqltype) { - case Types.CHAR: - case Types.NCHAR: - case Types.CLOB: - case Types.NCLOB: - case Types.VARCHAR: - case Types.LONGVARCHAR: - case Types.NVARCHAR: - case Types.LONGNVARCHAR: - preparedStatement.setString(columnIndex + 1, column - .asString()); - break; - - case Types.TINYINT: - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - case Types.DECIMAL: - case Types.FLOAT: - case Types.REAL: - case Types.DOUBLE: - String strValue = column.asString(); - if (emptyAsNull && "".equals(strValue)) { - preparedStatement.setNull(columnIndex + 1, columnSqltype); - } else { - switch (columnSqltype) { - case Types.TINYINT: - case Types.SMALLINT: - case Types.INTEGER: - preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); - break; - case Types.BIGINT: - preparedStatement.setLong(columnIndex + 1, column.asLong()); - break; - case Types.DECIMAL: - preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); - break; - case Types.REAL: - case Types.FLOAT: - preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); - break; - case Types.DOUBLE: - preparedStatement.setDouble(columnIndex + 1, column.asDouble()); - break; - } - } - break; - - case Types.DATE: - if (type.typeName - .equalsIgnoreCase("year")) { - if (column.asBigInteger() == null) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); - } - } else { - java.sql.Date sqlDate = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "Date 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlDate = new java.sql.Date(utilDate.getTime()); - } - preparedStatement.setDate(columnIndex + 1, sqlDate); - } - break; - - case Types.TIME: - java.sql.Time sqlTime = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "Date 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlTime = new java.sql.Time(utilDate.getTime()); - } - preparedStatement.setTime(columnIndex + 1, sqlTime); - break; - - case Types.TIMESTAMP: - Timestamp sqlTimestamp = null; - if (column instanceof StringColumn && column.asString() != null) { - String timeStampStr = column.asString(); - // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 - String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; - boolean isMatch = Pattern.matches(pattern, timeStampStr); - if (isMatch) { - sqlTimestamp = Timestamp.valueOf(timeStampStr); - preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); - break; - } - } - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "Date 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlTimestamp = new Timestamp( - utilDate.getTime()); - } - preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); - break; - - case Types.BINARY: - case Types.VARBINARY: - case Types.BLOB: - case Types.LONGVARBINARY: - preparedStatement.setBytes(columnIndex + 1, column - .asBytes()); - break; - - case Types.BOOLEAN: - preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); - break; - - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes - case Types.BIT: - if (this.dataBaseType == DataBaseType.MySql) { - Boolean asBoolean = column.asBoolean(); - if (asBoolean != null) { - preparedStatement.setBoolean(columnIndex + 1, asBoolean); - } else { - preparedStatement.setNull(columnIndex + 1, Types.BIT); - } - } else { - preparedStatement.setString(columnIndex + 1, column.asString()); - } - break; - - default: - boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement, cm, - columnIndex, columnSqltype, column); - if (isHandled) { - break; - } - throw DataXException - .asDataXException( - DBUtilErrorCode.UNSUPPORTED_TYPE, - String.format( - "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", - cm.getName(), - type.type, - type.typeName)); - } - return preparedStatement; - } catch (DataXException e) { - // fix类型转换或者溢出失败时,将具体哪一列打印出来 - if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || - e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { - throw DataXException - .asDataXException( - e.getErrorCode(), - String.format( - "类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", - cm.getName(), - type.type, - type.typeName)); - } else { - throw e; - } - } - } +// @Override +// protected PreparedStatement fillPreparedStatementColumnType( +// PreparedStatement preparedStatement, IStatementSetter col, int columnIndex, ColumnMetaData cm, Column column) throws SQLException { +// +// +// DataType type = cm.getType(); +// try { +// +// if (column.getRawData() == null) { +// preparedStatement.setNull(columnIndex + 1, columnSqltype); +// return preparedStatement; +// } +// +// java.util.Date utilDate; +// switch (columnSqltype) { +// case Types.CHAR: +// case Types.NCHAR: +// case Types.CLOB: +// case Types.NCLOB: +// case Types.VARCHAR: +// case Types.LONGVARCHAR: +// case Types.NVARCHAR: +// case Types.LONGNVARCHAR: +// preparedStatement.setString(columnIndex + 1, column +// .asString()); +// break; +// +// case Types.TINYINT: +// case Types.SMALLINT: +// case Types.INTEGER: +// case Types.BIGINT: +// case Types.DECIMAL: +// case Types.FLOAT: +// case Types.REAL: +// case Types.DOUBLE: +// String strValue = column.asString(); +// if (emptyAsNull && "".equals(strValue)) { +// preparedStatement.setNull(columnIndex + 1, columnSqltype); +// } else { +// switch (columnSqltype) { +// case Types.TINYINT: +// case Types.SMALLINT: +// case Types.INTEGER: +// preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); +// break; +// case Types.BIGINT: +// preparedStatement.setLong(columnIndex + 1, column.asLong()); +// break; +// case Types.DECIMAL: +// preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); +// break; +// case Types.REAL: +// case Types.FLOAT: +// preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); +// break; +// case Types.DOUBLE: +// preparedStatement.setDouble(columnIndex + 1, column.asDouble()); +// break; +// } +// } +// break; +// +// case Types.DATE: +// if (type.typeName +// .equalsIgnoreCase("year")) { +// if (column.asBigInteger() == null) { +// preparedStatement.setString(columnIndex + 1, null); +// } else { +// preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); +// } +// } else { +// java.sql.Date sqlDate = null; +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "Date 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlDate = new java.sql.Date(utilDate.getTime()); +// } +// preparedStatement.setDate(columnIndex + 1, sqlDate); +// } +// break; +// +// case Types.TIME: +// java.sql.Time sqlTime = null; +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "Date 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlTime = new java.sql.Time(utilDate.getTime()); +// } +// preparedStatement.setTime(columnIndex + 1, sqlTime); +// break; +// +// case Types.TIMESTAMP: +// Timestamp sqlTimestamp = null; +// if (column instanceof StringColumn && column.asString() != null) { +// String timeStampStr = column.asString(); +// // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 +// String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; +// boolean isMatch = Pattern.matches(pattern, timeStampStr); +// if (isMatch) { +// sqlTimestamp = Timestamp.valueOf(timeStampStr); +// preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); +// break; +// } +// } +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "Date 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlTimestamp = new Timestamp( +// utilDate.getTime()); +// } +// preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); +// break; +// +// case Types.BINARY: +// case Types.VARBINARY: +// case Types.BLOB: +// case Types.LONGVARBINARY: +// preparedStatement.setBytes(columnIndex + 1, column +// .asBytes()); +// break; +// +// case Types.BOOLEAN: +// preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); +// break; +// +// // warn: bit(1) -> Types.BIT 可使用setBoolean +// // warn: bit(>1) -> Types.VARBINARY 可使用setBytes +// case Types.BIT: +// if (this.dataBaseType == DataBaseType.MySql) { +// Boolean asBoolean = column.asBoolean(); +// if (asBoolean != null) { +// preparedStatement.setBoolean(columnIndex + 1, asBoolean); +// } else { +// preparedStatement.setNull(columnIndex + 1, Types.BIT); +// } +// } else { +// preparedStatement.setString(columnIndex + 1, column.asString()); +// } +// break; +// +// default: +// boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement, cm, +// columnIndex, columnSqltype, column); +// if (isHandled) { +// break; +// } +// throw DataXException +// .asDataXException( +// DBUtilErrorCode.UNSUPPORTED_TYPE, +// String.format( +// "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", +// cm.getName(), +// type.type, +// type.typeName)); +// } +// return preparedStatement; +// } catch (DataXException e) { +// // fix类型转换或者溢出失败时,将具体哪一列打印出来 +// if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || +// e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { +// throw DataXException +// .asDataXException( +// e.getErrorCode(), +// String.format( +// "类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", +// cm.getName(), +// type.type, +// type.typeName)); +// } else { +// throw e; +// } +// } +// } private Object toJavaArray(Object val) { if (null == val) { diff --git a/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java b/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java index cf9d2e42ea..0526a60985 100644 --- a/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java +++ b/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java @@ -30,4 +30,12 @@ protected String escapeEntity(String val) { return val; } } + + protected String unescapeEntity(String val) { + if (containEscapeChar) { + return StringUtils.remove(val, this.escapeChar); + } else { + return val; + } + } } diff --git a/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java b/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java index 55a5ef93ee..87b1728a35 100644 --- a/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java +++ b/common/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java @@ -26,8 +26,12 @@ public Iterator iterator() { } public static SelectCols createSelectCols(Configuration conf) { + return createSelectCols(conf, null); + } + + public static SelectCols createSelectCols(Configuration conf, String escapeChar) { return new SelectCols( - conf.getList(Key.COLUMN, String.class), conf.get(Key.ESCAPE_CHAR, String.class)); + conf.getList(Key.COLUMN, String.class), StringUtils.defaultString(escapeChar, conf.get(Key.ESCAPE_CHAR, String.class))); } public static SelectCols createSelectCols(List allColumns) { @@ -36,8 +40,8 @@ public static SelectCols createSelectCols(List allColumns) { private SelectCols(List columns, String escapeChar) { super(escapeChar); - this.columns = columns; - if (columns == null || columns.isEmpty()) { + this.columns = columns.stream().map((c) -> unescapeEntity(c)).collect(Collectors.toList()); + if (this.columns == null || this.columns.isEmpty()) { throw new IllegalArgumentException("param colums can not be empty "); } } @@ -118,4 +122,5 @@ public int size() { public boolean containsCol(String name) { return columns.contains(name); } + } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index 09f728d503..91722f824a 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import com.qlangtech.tis.plugin.ds.TableNotFoundException; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -202,8 +203,13 @@ public void startRead(Configuration readerSliceConfig, Connection conn = DBUtil.getConnection(this.readerDataSourceFactoryGetter, jdbcUrl, username, password); - Map tabCols = ColumnMetaData.toMap(this.readerDataSourceFactoryGetter.getDataSourceFactory() - .getTableMetadata(conn, table)); + Map tabCols = null; + try { + tabCols = ColumnMetaData.toMap(this.readerDataSourceFactoryGetter.getDataSourceFactory() + .getTableMetadata(conn, table)); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } if (MapUtils.isEmpty(tabCols)) { throw new IllegalStateException("table:" + table + " relevant tabCols can not be empty"); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java index 4fa299f955..fd94d9f68e 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DBUtil.java @@ -8,6 +8,7 @@ import com.alibaba.datax.plugin.rdbms.writer.util.SelectTable; import com.alibaba.druid.sql.parser.SQLParserUtils; import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.qlangtech.tis.datax.impl.DataxReader; import com.qlangtech.tis.datax.impl.DataxWriter; @@ -15,6 +16,7 @@ import com.qlangtech.tis.offline.DataxUtils; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import com.qlangtech.tis.plugin.ds.TableNotFoundException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -522,8 +524,12 @@ public static List getTableColumns(DataBaseType dataBaseType, IDataSourc } private static List getTableColums(IDataSourceFactoryGetter dataSourceFactoryGetter, SelectTable tableName) { - List tabMeta = dataSourceFactoryGetter.getDataSourceFactory().getTableMetadata(tableName.getUnescapeTabName()); - return tabMeta.stream().map((c) -> c.getName()).collect(Collectors.toList()); + try { + List tabMeta = dataSourceFactoryGetter.getDataSourceFactory().getTableMetadata(tableName.getUnescapeTabName()); + return tabMeta.stream().map((c) -> c.getName()).collect(Collectors.toList()); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } } public static List getTableColumnsByConn(DataBaseType dataBaseType, IDataSourceFactoryGetter conn, SelectTable tableName, String basicMsg) { @@ -580,13 +586,24 @@ public static List getColumnMetaData( */ public static List getColumnMetaData( Optional connection, IDataSourceFactoryGetter dsGetter, SelectTable tableName, SelectCols userConfiguredColumns) { -// if (userConfiguredColumns.) { -// throw new IllegalArgumentException("param userConfiguredColumns can not be empty"); -// } - List tabCols = connection.isPresent() - ? dsGetter.getDataSourceFactory().getTableMetadata(connection.get(), tableName.getUnescapeTabName()) - : dsGetter.getDataSourceFactory().getTableMetadata(tableName.getUnescapeTabName()); - return tabCols.stream().filter((c) -> userConfiguredColumns.containsCol(c.getName())).collect(Collectors.toList()); + + Map colMapper = null; + try { + List tabCols = connection.isPresent() + ? dsGetter.getDataSourceFactory().getTableMetadata(connection.get(), tableName.getUnescapeTabName()) + : dsGetter.getDataSourceFactory().getTableMetadata(tableName.getUnescapeTabName()); + colMapper = tabCols.stream().collect(Collectors.toMap((c) -> c.getName(), (c) -> c)); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + + List result = Lists.newArrayList(); + for (String col : userConfiguredColumns) { + result.add(Objects.requireNonNull( + colMapper.get(col), "col:" + col + " relevant meta can not be null")); + } + return result; + // return tabCols.stream().filter((c) -> userConfiguredColumns.containsCol(c.getName())).collect(Collectors.toList()); // return tabCols; // Statement statement = null; // ResultSet rs = null; diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index 0b1ff182dd..cf2e1ee6a5 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -10,13 +10,12 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.util.RdbmsException; -import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil; -import com.alibaba.datax.plugin.rdbms.writer.util.SelectCols; -import com.alibaba.datax.plugin.rdbms.writer.util.SelectTable; -import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; +import com.alibaba.datax.plugin.rdbms.writer.util.*; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import com.qlangtech.tis.web.start.TisAppLaunch; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; public class CommonRdbmsWriter { @@ -153,7 +153,7 @@ public void post(Configuration originalConfig) { // 已经由 prepare 进行了appendJDBCSuffix处理 String jdbcUrl = originalConfig.getString(Key.JDBC_URL); - SelectTable table = SelectTable.createInTask( originalConfig);//.getString(Key.TABLE); + SelectTable table = SelectTable.createInTask(originalConfig);//.getString(Key.TABLE); List postSqls = originalConfig.getList(Key.POST_SQL, String.class); @@ -209,7 +209,7 @@ public static class Task { protected String writeRecordSql; protected String writeMode; protected boolean emptyAsNull; - protected List resultSetMetaData; + protected List> resultSetMetaData; public Task(DataBaseType dataBaseType) { this.dataBaseType = dataBaseType; @@ -237,10 +237,6 @@ public void init(Configuration writerSliceConfig) { this.table = SelectTable.createInTask(writerSliceConfig); - this.columns = SelectCols.createSelectCols(writerSliceConfig); - - this.columnNumber = this.columns.size(); - this.preSqls = writerSliceConfig.getList(Key.PRE_SQL, String.class); this.postSqls = writerSliceConfig.getList(Key.POST_SQL, String.class); this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE); @@ -253,6 +249,8 @@ public void init(Configuration writerSliceConfig) { BASIC_MESSAGE = String.format("jdbcUrl:[%s], table:[%s]", this.jdbcUrl, this.table); this.dataSourceFactoryGetter = DBUtil.getWriterDataSourceFactoryGetter(writerSliceConfig); + this.columns = SelectCols.createSelectCols(writerSliceConfig, this.dataSourceFactoryGetter.getDataSourceFactory().getEscapeChar()); + this.columnNumber = this.columns.size(); } public void prepare(Configuration writerSliceConfig) { @@ -270,6 +268,175 @@ public void prepare(Configuration writerSliceConfig) { DBUtil.closeDBResources(null, null, connection); } + + protected IStatementSetter parseColSetter(ColumnMetaData cm) { + + switch (cm.getType().type) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: +// preparedStatement.setString(columnIndex + 1, column +// .asString()); + return (stat, colIndex, column) -> { + stat.setString(colIndex, column.asString()); + }; + + case Types.SMALLINT: + case Types.INTEGER: + return (stat, colIndex, column) -> { + // long l = 18446744073709551615l; + stat.setLong(colIndex, column.asLong()); + }; + case Types.BIGINT: + return (stat, colIndex, column) -> { + // long l = 18446744073709551615l; + // column.asBigInteger(); + stat.setBigDecimal(colIndex, column.asBigDecimal()); + }; + case Types.NUMERIC: + case Types.DECIMAL: + case Types.REAL: + return (stat, colIndex, column) -> { + + // String strValue = column.asString(); + if (emptyAsNull && column.getRawData() == null) { + stat.setNull(colIndex, cm.getType().type); + } else { + stat.setBigDecimal(colIndex, column.asBigDecimal()); + } + }; + case Types.FLOAT: + case Types.DOUBLE: + return (stat, colIndex, column) -> { + + // String strValue = column.asString(); + if (emptyAsNull && column.getRawData() == null) { + stat.setNull(colIndex, cm.getType().type); + } else { + stat.setDouble(colIndex, column.asDouble()); + } + }; + //tinyint is a little special in some database like mysql {boolean->tinyint(1)} + case Types.TINYINT: + + return (stat, colIndex, column) -> { +// Long longValue = column.asLong(); +// if (null == longValue) { +// stat.setString(colIndex, null); +// } else { + stat.setShort(colIndex, column.asLong().shortValue()); + //} + }; + + // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 + case Types.DATE: + + return (stat, colIndex, column) -> { +// if (typeName == null) { +// typeName = cm.getType().typeName; //this.resultSetMetaData.getRight().get(columnIndex); +// } + java.util.Date utilDate; + if (cm.getType().typeName.equalsIgnoreCase("year")) { +// if (column.asBigInteger() == null) { +// stat.setString(columnIndex + 1, null); +// } else { + stat.setInt(colIndex, column.asBigInteger().intValue()); + //} + } else { + java.sql.Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new java.sql.Date(utilDate.getTime()); + } + stat.setDate(colIndex, sqlDate); + } + }; + // break; + + case Types.TIME: + return (stat, colIndex, column) -> { + java.sql.Time sqlTime = null; + java.util.Date utilDate; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIME 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new java.sql.Time(utilDate.getTime()); + } + stat.setTime(colIndex, sqlTime); + }; + + case Types.TIMESTAMP: + return (stat, colIndex, column) -> { + java.sql.Timestamp sqlTimestamp = null; + java.util.Date utilDate; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "TIMESTAMP 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new java.sql.Timestamp( + utilDate.getTime()); + } + stat.setTimestamp(colIndex, sqlTimestamp); + }; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + return (stat, colIndex, column) -> { + stat.setBytes(colIndex, column + .asBytes()); + }; + case Types.BOOLEAN: + return (stat, colIndex, column) -> { + stat.setBoolean(colIndex, column.asBoolean()); + }; + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + return (stat, colIndex, column) -> { + if (this.dataBaseType == DataBaseType.MySql) { + stat.setBoolean(colIndex, column.asBoolean()); + } else { + stat.setString(colIndex, column.asString()); + } + }; + + //break; + default: + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + cm.getName(), + cm.getType().type, + cm.getType().typeName)); + } + + } + + public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { this.taskPluginCollector = taskPluginCollector; @@ -277,7 +444,9 @@ public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCo // this.resultSetMetaData = DBUtil.getColumnMetaData(connection, // this.table, StringUtils.join(this.columns, ",")); - this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), this.dataSourceFactoryGetter, this.table, columns); + this.resultSetMetaData = DBUtil.getColumnMetaData(Optional.of(connection), this.dataSourceFactoryGetter, this.table, columns) + .stream().map((c) -> Pair.of(c, parseColSetter(c))).collect(Collectors.toList()); + // 写数据库的SQL语句 calcWriteRecordSql(); @@ -392,15 +561,36 @@ protected void doOneInsert(Connection connection, List buffer) { PreparedStatement preparedStatement = null; try { connection.setAutoCommit(true); - preparedStatement = connection - .prepareStatement(this.writeRecordSql); + preparedStatement = connection.prepareStatement(this.writeRecordSql); for (Record record : buffer) { try { - preparedStatement = fillPreparedStatement( - preparedStatement, record); + preparedStatement = fillPreparedStatement(preparedStatement, record); preparedStatement.execute(); } catch (SQLException e) { + + if (TisAppLaunch.isTestMock()) { + // 试图找到哪一列有问题 + final int allCols = this.columnNumber; + for (int tryColNumer = allCols; tryColNumer > 0; tryColNumer--) { + try { + preparedStatement = connection + .prepareStatement(this.writeRecordSql); + preparedStatement = fillPreparedStatement( + preparedStatement, record); + for (int startNullIndex = (tryColNumer); startNullIndex < allCols; startNullIndex++) { + preparedStatement.setNull(startNullIndex + 1, this.resultSetMetaData.get(startNullIndex).getLeft().getType().type); + } + preparedStatement.execute(); + LOG.info("success columnNumber:" + columnNumber + "," + this.resultSetMetaData.get(columnNumber).getLeft()); + break; + } catch (Throwable ex) { + // ex.printStackTrace(); + } finally { + this.columnNumber--; + } + } + } LOG.debug(e.toString()); this.taskPluginCollector.collectDirtyRecord(record, e); @@ -420,165 +610,174 @@ protected void doOneInsert(Connection connection, List buffer) { // 直接使用了两个类变量:columnNumber,resultSetMetaData protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record) throws SQLException { - ColumnMetaData col = null; + Pair col = null; + // DataType type = null; for (int i = 0; i < this.columnNumber; i++) { col = this.resultSetMetaData.get(i); + // type = col.getType(); // int columnSqltype = this.resultSetMetaData.getMiddle().get(i); // String typeName = this.resultSetMetaData.getRight().get(i); - preparedStatement = fillPreparedStatementColumnType(preparedStatement, i, col.getType().type, col.getType().typeName, record.getColumn(i)); + preparedStatement = fillPreparedStatementColumnType(preparedStatement, col.getRight(), i, col.getKey(), record.getColumn(i)); } return preparedStatement; } - protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, - int columnSqltype, Column column) throws SQLException { - return fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqltype, null, column); - } - - protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, - int columnSqltype, String typeName, Column column) throws SQLException { +// protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, IStatementSetter col, ColumnMetaData cm, +// Column column) throws SQLException { +// return fillPreparedStatementColumnType(preparedStatement, col, cm, column); +// } - ColumnMetaData cm = this.resultSetMetaData.get(columnIndex); + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, IStatementSetter col, int columnIndex, + ColumnMetaData cm, Column column) throws SQLException { - java.util.Date utilDate; - switch (columnSqltype) { - case Types.CHAR: - case Types.NCHAR: - case Types.CLOB: - case Types.NCLOB: - case Types.VARCHAR: - case Types.LONGVARCHAR: - case Types.NVARCHAR: - case Types.LONGNVARCHAR: - preparedStatement.setString(columnIndex + 1, column - .asString()); - break; - - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - case Types.NUMERIC: - case Types.DECIMAL: - case Types.FLOAT: - case Types.REAL: - case Types.DOUBLE: - String strValue = column.asString(); - if (emptyAsNull && "".equals(strValue)) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setString(columnIndex + 1, strValue); - } - break; - - //tinyint is a little special in some database like mysql {boolean->tinyint(1)} - case Types.TINYINT: - Long longValue = column.asLong(); - if (null == longValue) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setString(columnIndex + 1, longValue.toString()); - } - break; - - // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 - case Types.DATE: - if (typeName == null) { - typeName = cm.getType().typeName; //this.resultSetMetaData.getRight().get(columnIndex); - } - - if (typeName.equalsIgnoreCase("year")) { - if (column.asBigInteger() == null) { - preparedStatement.setString(columnIndex + 1, null); - } else { - preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); - } - } else { - java.sql.Date sqlDate = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "Date 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlDate = new java.sql.Date(utilDate.getTime()); - } - preparedStatement.setDate(columnIndex + 1, sqlDate); - } - break; - - case Types.TIME: - java.sql.Time sqlTime = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "TIME 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlTime = new java.sql.Time(utilDate.getTime()); - } - preparedStatement.setTime(columnIndex + 1, sqlTime); - break; - - case Types.TIMESTAMP: - java.sql.Timestamp sqlTimestamp = null; - try { - utilDate = column.asDate(); - } catch (DataXException e) { - throw new SQLException(String.format( - "TIMESTAMP 类型转换错误:[%s]", column)); - } - - if (null != utilDate) { - sqlTimestamp = new java.sql.Timestamp( - utilDate.getTime()); - } - preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); - break; - - case Types.BINARY: - case Types.VARBINARY: - case Types.BLOB: - case Types.LONGVARBINARY: - preparedStatement.setBytes(columnIndex + 1, column - .asBytes()); - break; - - case Types.BOOLEAN: - preparedStatement.setString(columnIndex + 1, column.asString()); - break; - - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes - case Types.BIT: - if (this.dataBaseType == DataBaseType.MySql) { - preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); - } else { - preparedStatement.setString(columnIndex + 1, column.asString()); - } - break; - default: - throw DataXException - .asDataXException( - DBUtilErrorCode.UNSUPPORTED_TYPE, - String.format( - "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", - cm.getName(), - cm.getType().type, - cm.getType().typeName)); + if (column.getRawData() == null) { + preparedStatement.setNull(columnIndex + 1, cm.getType().type); + } else { + col.set(preparedStatement, columnIndex + 1, column); } return preparedStatement; + +// ColumnMetaData cm = this.resultSetMetaData.get(columnIndex); +// +// java.util.Date utilDate; +// switch (columnSqltype) { +// case Types.CHAR: +// case Types.NCHAR: +// case Types.CLOB: +// case Types.NCLOB: +// case Types.VARCHAR: +// case Types.LONGVARCHAR: +// case Types.NVARCHAR: +// case Types.LONGNVARCHAR: +// preparedStatement.setString(columnIndex + 1, column +// .asString()); +// break; +// +// case Types.SMALLINT: +// case Types.INTEGER: +// case Types.BIGINT: +// case Types.NUMERIC: +// case Types.DECIMAL: +// case Types.FLOAT: +// case Types.REAL: +// case Types.DOUBLE: +// String strValue = column.asString(); +// if (emptyAsNull && "".equals(strValue)) { +// preparedStatement.setString(columnIndex + 1, null); +// } else { +// preparedStatement.setString(columnIndex + 1, strValue); +// } +// break; +// +// //tinyint is a little special in some database like mysql {boolean->tinyint(1)} +// case Types.TINYINT: +// Long longValue = column.asLong(); +// if (null == longValue) { +// preparedStatement.setString(columnIndex + 1, null); +// } else { +// preparedStatement.setString(columnIndex + 1, longValue.toString()); +// } +// break; +// +// // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115 +// case Types.DATE: +// if (typeName == null) { +// typeName = cm.getType().typeName; //this.resultSetMetaData.getRight().get(columnIndex); +// } +// +// if (typeName.equalsIgnoreCase("year")) { +// if (column.asBigInteger() == null) { +// preparedStatement.setString(columnIndex + 1, null); +// } else { +// preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); +// } +// } else { +// java.sql.Date sqlDate = null; +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "Date 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlDate = new java.sql.Date(utilDate.getTime()); +// } +// preparedStatement.setDate(columnIndex + 1, sqlDate); +// } +// break; +// +// case Types.TIME: +// java.sql.Time sqlTime = null; +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "TIME 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlTime = new java.sql.Time(utilDate.getTime()); +// } +// preparedStatement.setTime(columnIndex + 1, sqlTime); +// break; +// +// case Types.TIMESTAMP: +// java.sql.Timestamp sqlTimestamp = null; +// try { +// utilDate = column.asDate(); +// } catch (DataXException e) { +// throw new SQLException(String.format( +// "TIMESTAMP 类型转换错误:[%s]", column)); +// } +// +// if (null != utilDate) { +// sqlTimestamp = new java.sql.Timestamp( +// utilDate.getTime()); +// } +// preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); +// break; +// +// case Types.BINARY: +// case Types.VARBINARY: +// case Types.BLOB: +// case Types.LONGVARBINARY: +// preparedStatement.setBytes(columnIndex + 1, column +// .asBytes()); +// break; +// +// case Types.BOOLEAN: +// preparedStatement.setString(columnIndex + 1, column.asString()); +// break; +// +// // warn: bit(1) -> Types.BIT 可使用setBoolean +// // warn: bit(>1) -> Types.VARBINARY 可使用setBytes +// case Types.BIT: +// if (this.dataBaseType == DataBaseType.MySql) { +// preparedStatement.setBoolean(columnIndex + 1, column.asBoolean()); +// } else { +// preparedStatement.setString(columnIndex + 1, column.asString()); +// } +// break; +// default: +// throw DataXException +// .asDataXException( +// DBUtilErrorCode.UNSUPPORTED_TYPE, +// String.format( +// "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", +// cm.getName(), +// cm.getType().type, +// cm.getType().typeName)); +// } +// return preparedStatement; } private void calcWriteRecordSql() { if (!VALUE_HOLDER.equals(calcValueHolder(""))) { List valueHolders = new ArrayList(columnNumber); for (int i = 0; i < columns.size(); i++) { - String type = resultSetMetaData.get(i).getType().typeName; + String type = resultSetMetaData.get(i).getLeft().getType().typeName; valueHolders.add(calcValueHolder(type)); } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/IStatementSetter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/IStatementSetter.java new file mode 100644 index 0000000000..f50ccaa623 --- /dev/null +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/IStatementSetter.java @@ -0,0 +1,14 @@ +package com.alibaba.datax.plugin.rdbms.writer.util; + +import com.alibaba.datax.common.element.Column; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2022-10-04 13:14 + **/ +public interface IStatementSetter { + void set(PreparedStatement statement, int colIndex, Column column) throws SQLException; +} diff --git a/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java b/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java index ede13db749..417c184e25 100755 --- a/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java +++ b/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java @@ -1,15 +1,8 @@ package com.alibaba.datax.plugin.reader.rdbmswriter; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; -import com.qlangtech.tis.plugin.ds.ColumnMetaData; - -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; public class SubCommonRdbmsWriter extends CommonRdbmsWriter { static { @@ -27,145 +20,146 @@ public Task(DataBaseType dataBaseType) { super(dataBaseType); } - @Override - protected PreparedStatement fillPreparedStatementColumnType( - PreparedStatement preparedStatement, int columnIndex, - int columnSqltype, Column column) throws SQLException { - java.util.Date utilDate; - try { - switch (columnSqltype) { - case Types.CHAR: - case Types.NCHAR: - case Types.CLOB: - case Types.NCLOB: - case Types.VARCHAR: - case Types.LONGVARCHAR: - case Types.NVARCHAR: - case Types.LONGNVARCHAR: - if (null == column.getRawData()) { - preparedStatement.setObject(columnIndex + 1, null); - } else { - preparedStatement.setString(columnIndex + 1, - column.asString()); - } - break; - - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - case Types.TINYINT: - String strLongValue = column.asString(); - if (emptyAsNull && "".equals(strLongValue)) { - preparedStatement.setObject(columnIndex + 1, null); - } else if (null == column.getRawData()) { - preparedStatement.setObject(columnIndex + 1, null); - } else { - preparedStatement.setLong(columnIndex + 1, - column.asLong()); - } - break; - case Types.NUMERIC: - case Types.DECIMAL: - case Types.FLOAT: - case Types.REAL: - case Types.DOUBLE: - String strValue = column.asString(); - if (emptyAsNull && "".equals(strValue)) { - preparedStatement.setObject(columnIndex + 1, null); - } else if (null == column.getRawData()) { - preparedStatement.setObject(columnIndex + 1, null); - } else { - preparedStatement.setDouble(columnIndex + 1, - column.asDouble()); - } - break; - - case Types.DATE: - java.sql.Date sqlDate = null; - utilDate = column.asDate(); - if (null != utilDate) { - sqlDate = new java.sql.Date(utilDate.getTime()); - preparedStatement.setDate(columnIndex + 1, sqlDate); - } else { - preparedStatement.setNull(columnIndex + 1, Types.DATE); - } - break; - - case Types.TIME: - java.sql.Time sqlTime = null; - utilDate = column.asDate(); - if (null != utilDate) { - sqlTime = new java.sql.Time(utilDate.getTime()); - preparedStatement.setTime(columnIndex + 1, sqlTime); - } else { - preparedStatement.setNull(columnIndex + 1, Types.TIME); - } - break; - case Types.TIMESTAMP: - java.sql.Timestamp sqlTimestamp = null; - utilDate = column.asDate(); - if (null != utilDate) { - sqlTimestamp = new java.sql.Timestamp( - utilDate.getTime()); - preparedStatement.setTimestamp(columnIndex + 1, - sqlTimestamp); - } else { - preparedStatement.setNull(columnIndex + 1, - Types.TIMESTAMP); - } - break; - - case Types.BINARY: - case Types.VARBINARY: - case Types.BLOB: - case Types.LONGVARBINARY: - if (null == column.getRawData()) { - preparedStatement.setObject(columnIndex + 1, null); - } else { - preparedStatement.setBytes(columnIndex + 1, - column.asBytes()); - } - break; - - case Types.BOOLEAN: - if (null == column.getRawData()) { - preparedStatement.setNull(columnIndex + 1, - Types.BOOLEAN); - } else { - preparedStatement.setBoolean(columnIndex + 1, - column.asBoolean()); - } - break; - - // warn: bit(1) -> Types.BIT 可使用setBoolean - // warn: bit(>1) -> Types.VARBINARY 可使用setBytes - case Types.BIT: - if (null == column.getRawData()) { - preparedStatement.setObject(columnIndex + 1, null); - } else if (this.dataBaseType == DataBaseType.MySql) { - preparedStatement.setBoolean(columnIndex + 1, - column.asBoolean()); - } else { - preparedStatement.setString(columnIndex + 1, - column.asString()); - } - break; - default: - preparedStatement.setObject(columnIndex + 1, - column.getRawData()); - break; - } - } catch (DataXException e) { - ColumnMetaData meta = this.resultSetMetaData.get(columnIndex); - throw new SQLException(String.format( - "类型转换错误:[%s] 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s].", - column, - meta.getName(), - meta.getType().type, - meta.getType().typeName)); - } - return preparedStatement; - } +// @Override +// protected PreparedStatement fillPreparedStatementColumnType( +// PreparedStatement preparedStatement, IStatementSetter col, int columnIndex, +// ColumnMetaData cm, Column column) throws SQLException { +// java.util.Date utilDate; +// try { +// switch (columnSqltype) { +// case Types.CHAR: +// case Types.NCHAR: +// case Types.CLOB: +// case Types.NCLOB: +// case Types.VARCHAR: +// case Types.LONGVARCHAR: +// case Types.NVARCHAR: +// case Types.LONGNVARCHAR: +// if (null == column.getRawData()) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else { +// preparedStatement.setString(columnIndex + 1, +// column.asString()); +// } +// break; +// +// case Types.SMALLINT: +// case Types.INTEGER: +// case Types.BIGINT: +// case Types.TINYINT: +// String strLongValue = column.asString(); +// if (emptyAsNull && "".equals(strLongValue)) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else if (null == column.getRawData()) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else { +// preparedStatement.setLong(columnIndex + 1, +// column.asLong()); +// } +// break; +// case Types.NUMERIC: +// case Types.DECIMAL: +// case Types.FLOAT: +// case Types.REAL: +// case Types.DOUBLE: +// String strValue = column.asString(); +// if (emptyAsNull && "".equals(strValue)) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else if (null == column.getRawData()) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else { +// preparedStatement.setDouble(columnIndex + 1, +// column.asDouble()); +// } +// break; +// +// case Types.DATE: +// java.sql.Date sqlDate = null; +// utilDate = column.asDate(); +// if (null != utilDate) { +// sqlDate = new java.sql.Date(utilDate.getTime()); +// preparedStatement.setDate(columnIndex + 1, sqlDate); +// } else { +// preparedStatement.setNull(columnIndex + 1, Types.DATE); +// } +// break; +// +// case Types.TIME: +// java.sql.Time sqlTime = null; +// utilDate = column.asDate(); +// if (null != utilDate) { +// sqlTime = new java.sql.Time(utilDate.getTime()); +// preparedStatement.setTime(columnIndex + 1, sqlTime); +// } else { +// preparedStatement.setNull(columnIndex + 1, Types.TIME); +// } +// break; +// +// case Types.TIMESTAMP: +// java.sql.Timestamp sqlTimestamp = null; +// utilDate = column.asDate(); +// if (null != utilDate) { +// sqlTimestamp = new java.sql.Timestamp( +// utilDate.getTime()); +// preparedStatement.setTimestamp(columnIndex + 1, +// sqlTimestamp); +// } else { +// preparedStatement.setNull(columnIndex + 1, +// Types.TIMESTAMP); +// } +// break; +// +// case Types.BINARY: +// case Types.VARBINARY: +// case Types.BLOB: +// case Types.LONGVARBINARY: +// if (null == column.getRawData()) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else { +// preparedStatement.setBytes(columnIndex + 1, +// column.asBytes()); +// } +// break; +// +// case Types.BOOLEAN: +// if (null == column.getRawData()) { +// preparedStatement.setNull(columnIndex + 1, +// Types.BOOLEAN); +// } else { +// preparedStatement.setBoolean(columnIndex + 1, +// column.asBoolean()); +// } +// break; +// +// // warn: bit(1) -> Types.BIT 可使用setBoolean +// // warn: bit(>1) -> Types.VARBINARY 可使用setBytes +// case Types.BIT: +// if (null == column.getRawData()) { +// preparedStatement.setObject(columnIndex + 1, null); +// } else if (this.dataBaseType == DataBaseType.MySql) { +// preparedStatement.setBoolean(columnIndex + 1, +// column.asBoolean()); +// } else { +// preparedStatement.setString(columnIndex + 1, +// column.asString()); +// } +// break; +// default: +// preparedStatement.setObject(columnIndex + 1, +// column.getRawData()); +// break; +// } +// } catch (DataXException e) { +// ColumnMetaData meta = this.resultSetMetaData.get(columnIndex); +// throw new SQLException(String.format( +// "类型转换错误:[%s] 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s].", +// column, +// meta.getName(), +// meta.getType().type, +// meta.getType().typeName)); +// } +// return preparedStatement; +// } } }