From 29d218bcfd5c99322853ec4052a7d064bce8e8d6 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Thu, 5 Oct 2023 15:42:25 +0800 Subject: [PATCH] refactor:extract method getDateStr from dataESWriter.Task for issue https://github.com/datavane/tis/issues/272 --- .../elasticsearchwriter/DataConvertUtils.java | 32 ++ .../writer/elasticsearchwriter/ESWriter.java | 323 +++++++++--------- install.sh | 2 +- 3 files changed, 188 insertions(+), 169 deletions(-) create mode 100644 elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/DataConvertUtils.java diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/DataConvertUtils.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/DataConvertUtils.java new file mode 100644 index 0000000000..4b95154f22 --- /dev/null +++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/DataConvertUtils.java @@ -0,0 +1,32 @@ +package com.alibaba.datax.plugin.writer.elasticsearchwriter; + +import com.alibaba.datax.common.element.Column; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * @author 百岁 (baisui@qlangtech.com) + * @date 2023/10/5 + */ +public class DataConvertUtils { + public static String getDateStr(ESColumn esColumn, Column column) { + DateTime date = null; + DateTimeZone dtz = DateTimeZone.getDefault(); + if (esColumn.getTimezone() != null) { + // 所有时区参考 http://www.joda.org/joda-time/timezones.html + dtz = DateTimeZone.forID(esColumn.getTimezone()); + } + if (column.getType() != Column.Type.DATE && esColumn.getFormat() != null) { + DateTimeFormatter formatter = DateTimeFormat.forPattern(esColumn.getFormat()); + date = formatter.withZone(dtz).parseDateTime(column.asString()); + return date.toString(); + } else if (column.getType() == Column.Type.DATE) { + date = new DateTime(column.asLong(), dtz); + return date.toString(); + } else { + return column.asString(); + } + } +} diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ESWriter.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ESWriter.java index 0d55f7f9a5..a75fe59c75 100644 --- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ESWriter.java +++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ESWriter.java @@ -15,10 +15,6 @@ import io.searchbox.core.Bulk; import io.searchbox.core.BulkResult; import io.searchbox.core.Index; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,13 +45,13 @@ public void prepare() { * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。 */ ESClient esClient = new ESClient(ESInitialization.create(conf)); -// esClient.es.create(Key.getEndpoint(conf), -// Key.getAccessID(conf), -// Key.getAccessKey(conf), -// false, -// 300000, -// false, -// false, esClient); + // esClient.es.create(Key.getEndpoint(conf), + // Key.getAccessID(conf), + // Key.getAccessKey(conf), + // false, + // 300000, + // false, + // false, esClient); String indexName = Key.getIndexName(conf); String typeName = Key.getTypeName(conf); @@ -63,9 +59,7 @@ public void prepare() { String mappings = esClient.genMappings((JSONArray) conf.getList("column"), typeName, (columnList) -> { conf.set(WRITE_COLUMNS, JSON.toJSONString(columnList)); }); - String settings = JSONObject.toJSONString( - Key.getSettings(conf) - ); + String settings = JSONObject.toJSONString(Key.getSettings(conf)); log.info(String.format("index:[%s], type:[%s], mappings:[%s]", indexName, typeName, mappings)); try { @@ -83,115 +77,120 @@ public void prepare() { esClient.closeJestClient(); } -// /** -// * https://www.elastic.co/guide/en/elasticsearch/reference/current/explicit-mapping.html -// * -// * @param typeName -// * @return -// */ -// public String genMappings(JSONArray column, String typeName) { -// String mappings = null; -// Map propMap = new HashMap(); -// List columnList = new ArrayList(); -// -// // JSONArray column = (JSONArray) conf.getList("column"); -// if (column != null) { -// for (Object col : column) { -// JSONObject jo = (JSONObject) col; -// String colName = jo.getString("name"); -// String colTypeStr = jo.getString("type"); -// if (colTypeStr == null) { -// throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " column must have type"); -// } -// ESFieldType colType = ESFieldType.getESFieldType(colTypeStr); -// if (colType == null) { -// throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, col.toString() + " unsupported type"); -// } -// -// ESColumn columnItem = new ESColumn(); -// -// if (colName.equals(Key.PRIMARY_KEY_COLUMN_NAME)) { -// // 兼容已有版本 -// colType = ESFieldType.ID; -// colTypeStr = "id"; -// } -// -// columnItem.setName(colName); -// columnItem.setType(colTypeStr); -// -// if (colType == ESFieldType.ID) { -// columnList.add(columnItem); -// // 如果是id,则properties为空 -// continue; -// } -// -// Boolean array = jo.getBoolean("array"); -// if (array != null) { -// columnItem.setArray(array); -// } -// Map field = new HashMap(); -// field.put("type", colTypeStr); -// //https://www.elastic.co/guide/en/elasticsearch/reference/5.2/breaking_50_mapping_changes.html#_literal_index_literal_property -// // https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_deep_dive_on_doc_values.html#_disabling_doc_values -// field.put("doc_values", jo.getBoolean("doc_values")); -// field.put("ignore_above", jo.getInteger("ignore_above")); -// field.put("index", jo.getBoolean("index")); -// -// switch (colType) { -// case STRING: -// // 兼容string类型,ES5之前版本 -// break; -// case KEYWORD: -// // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals -// field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals")); -// case TEXT: -// field.put("analyzer", jo.getString("analyzer")); -// // 优化disk使用,也同步会提高index性能 -// // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html -// field.put("norms", jo.getBoolean("norms")); -// field.put("index_options", jo.getBoolean("index_options")); -// break; -// case DATE: -// columnItem.setTimeZone(jo.getString("timezone")); -// columnItem.setFormat(jo.getString("format")); -// // 后面时间会处理为带时区的标准时间,所以不需要给ES指定格式 -// /* -// if (jo.getString("format") != null) { -// field.put("format", jo.getString("format")); -// } else { -// //field.put("format", "strict_date_optional_time||epoch_millis||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"); -// } -// */ -// break; -// case GEO_SHAPE: -// field.put("tree", jo.getString("tree")); -// field.put("precision", jo.getString("precision")); -// default: -// break; -// } -// propMap.put(colName, field); -// columnList.add(columnItem); -// } -// } else { -// throw new IllegalStateException("conf.getList(\"column\") can not be empty"); -// } -// -// conf.set(WRITE_COLUMNS, JSON.toJSONString(columnList)); -// -// -// Map rootMappings = new HashMap(); -// Map typeMappings = new HashMap(); -// typeMappings.put("properties", propMap); -// rootMappings.put(typeName, typeMappings); -// -// mappings = StringUtils.isNotBlank(typeName) ? JSON.toJSONString(rootMappings) : JSON.toJSONString(typeMappings); -// -// if (StringUtils.isEmpty(mappings)) { -// throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, "must have mappings"); -// } -// log.info(mappings); -// return mappings; -// } + // /** + // * https://www.elastic.co/guide/en/elasticsearch/reference/current/explicit-mapping.html + // * + // * @param typeName + // * @return + // */ + // public String genMappings(JSONArray column, String typeName) { + // String mappings = null; + // Map propMap = new HashMap(); + // List columnList = new ArrayList(); + // + // // JSONArray column = (JSONArray) conf.getList("column"); + // if (column != null) { + // for (Object col : column) { + // JSONObject jo = (JSONObject) col; + // String colName = jo.getString("name"); + // String colTypeStr = jo.getString("type"); + // if (colTypeStr == null) { + // throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, col + // .toString() + " column must have type"); + // } + // ESFieldType colType = ESFieldType.getESFieldType(colTypeStr); + // if (colType == null) { + // throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, col + // .toString() + " unsupported type"); + // } + // + // ESColumn columnItem = new ESColumn(); + // + // if (colName.equals(Key.PRIMARY_KEY_COLUMN_NAME)) { + // // 兼容已有版本 + // colType = ESFieldType.ID; + // colTypeStr = "id"; + // } + // + // columnItem.setName(colName); + // columnItem.setType(colTypeStr); + // + // if (colType == ESFieldType.ID) { + // columnList.add(columnItem); + // // 如果是id,则properties为空 + // continue; + // } + // + // Boolean array = jo.getBoolean("array"); + // if (array != null) { + // columnItem.setArray(array); + // } + // Map field = new HashMap(); + // field.put("type", colTypeStr); + // //https://www.elastic.co/guide/en/elasticsearch/reference/5.2/breaking_50_mapping_changes.html#_literal_index_literal_property + // // https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_deep_dive_on_doc_values.html#_disabling_doc_values + // field.put("doc_values", jo.getBoolean("doc_values")); + // field.put("ignore_above", jo.getInteger("ignore_above")); + // field.put("index", jo.getBoolean("index")); + // + // switch (colType) { + // case STRING: + // // 兼容string类型,ES5之前版本 + // break; + // case KEYWORD: + // // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#_warm_up_global_ordinals + // field.put("eager_global_ordinals", jo.getBoolean("eager_global_ordinals")); + // case TEXT: + // field.put("analyzer", jo.getString("analyzer")); + // // 优化disk使用,也同步会提高index性能 + // // https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-disk-usage.html + // field.put("norms", jo.getBoolean("norms")); + // field.put("index_options", jo.getBoolean("index_options")); + // break; + // case DATE: + // columnItem.setTimeZone(jo.getString("timezone")); + // columnItem.setFormat(jo.getString("format")); + // // 后面时间会处理为带时区的标准时间,所以不需要给ES指定格式 + // /* + // if (jo.getString("format") != null) { + // field.put("format", jo.getString("format")); + // } else { + // //field.put("format", "strict_date_optional_time||epoch_millis||yyyy-MM-dd + // HH:mm:ss||yyyy-MM-dd"); + // } + // */ + // break; + // case GEO_SHAPE: + // field.put("tree", jo.getString("tree")); + // field.put("precision", jo.getString("precision")); + // default: + // break; + // } + // propMap.put(colName, field); + // columnList.add(columnItem); + // } + // } else { + // throw new IllegalStateException("conf.getList(\"column\") can not be empty"); + // } + // + // conf.set(WRITE_COLUMNS, JSON.toJSONString(columnList)); + // + // + // Map rootMappings = new HashMap(); + // Map typeMappings = new HashMap(); + // typeMappings.put("properties", propMap); + // rootMappings.put(typeName, typeMappings); + // + // mappings = StringUtils.isNotBlank(typeName) ? JSON.toJSONString(rootMappings) : JSON + // .toJSONString(typeMappings); + // + // if (StringUtils.isEmpty(mappings)) { + // throw DataXException.asDataXException(ESWriterErrorCode.BAD_CONFIG_VALUE, "must have + // mappings"); + // } + // log.info(mappings); + // return mappings; + // } @Override public List split(int mandatoryNumber) { @@ -205,13 +204,13 @@ public List split(int mandatoryNumber) { @Override public void post() { ESClient esClient = new ESClient(ESInitialization.create(conf)); -// esClient.es.create(Key.getEndpoint(conf), -// Key.getAccessID(conf), -// Key.getAccessKey(conf), -// false, -// 300000, -// false, -// false, esClient); + // esClient.es.create(Key.getEndpoint(conf), + // Key.getAccessID(conf), + // Key.getAccessKey(conf), + // false, + // 300000, + // false, + // false, esClient); String alias = Key.getAlias(conf); if (!"".equals(alias)) { log.info(String.format("alias [%s] to [%s]", alias, Key.getIndexName(conf))); @@ -264,19 +263,19 @@ public void init() { typeList.add(ESFieldType.getESFieldType(col.getType())); } - esClient = new ESClient( - ESInitialization.create(conf, Key.isMultiThread(conf), Key.getTimeout(conf), Key.isCompression(conf), Key.isDiscovery(conf))); + esClient = new ESClient(ESInitialization.create(conf, Key.isMultiThread(conf), Key.getTimeout(conf), + Key.isCompression(conf), Key.isDiscovery(conf))); } @Override public void prepare() { -// esClient.es.create(Key.getEndpoint(conf), -// Key.getAccessID(conf), -// Key.getAccessKey(conf), -// , -// , -// , -// , esClient); + // esClient.es.create(Key.getEndpoint(conf), + // Key.getAccessID(conf), + // Key.getAccessKey(conf), + // , + // , + // , + // , esClient); } @Override @@ -303,25 +302,6 @@ public void startWrite(RecordReceiver recordReceiver) { esClient.closeJestClient(); } - private String getDateStr(ESColumn esColumn, Column column) { - DateTime date = null; - DateTimeZone dtz = DateTimeZone.getDefault(); - if (esColumn.getTimezone() != null) { - // 所有时区参考 http://www.joda.org/joda-time/timezones.html - dtz = DateTimeZone.forID(esColumn.getTimezone()); - } - if (column.getType() != Column.Type.DATE && esColumn.getFormat() != null) { - DateTimeFormatter formatter = DateTimeFormat.forPattern(esColumn.getFormat()); - date = formatter.withZone(dtz).parseDateTime(column.asString()); - return date.toString(); - } else if (column.getType() == Column.Type.DATE) { - date = new DateTime(column.asLong(), dtz); - return date.toString(); - } else { - return column.asString(); - } - } - private long doBatchInsert(final List writerBuffer) { Map data = null; final Bulk.Builder bulkaction = new Bulk.Builder().defaultIndex(this.index).defaultType(this.type); @@ -339,7 +319,7 @@ private long doBatchInsert(final List writerBuffer) { data.put(columnName, dataList); } else { for (int pos = 0; pos < dataList.length; pos++) { - dataList[pos] = getDateStr(columnList.get(i), column); + dataList[pos] = DataConvertUtils.getDateStr(columnList.get(i), column); } data.put(columnName, dataList); } @@ -354,14 +334,15 @@ private long doBatchInsert(final List writerBuffer) { break; case DATE: try { - String dateStr = getDateStr(columnList.get(i), column); + String dateStr = DataConvertUtils.getDateStr(columnList.get(i), column); data.put(columnName, dateStr); } catch (Exception e) { - getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e.toString())); + getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 " + + "[%s:%s] exception: %s", columnName, column.toString(), e.toString())); } break; case KEYWORD: - // case STRING: + // case STRING: case TEXT: case IP: case GEO_POINT: @@ -393,7 +374,8 @@ private long doBatchInsert(final List writerBuffer) { data.put(columnName, JSON.parse(column.asString())); break; default: - getTaskPluginCollector().collectDirtyRecord(record, "类型错误:不支持的类型:" + columnType + " " + columnName); + getTaskPluginCollector().collectDirtyRecord(record, + "类型错误:不支持的类型:" + columnType + " " + columnName); } } } @@ -415,7 +397,8 @@ public Integer call() throws Exception { return writerBuffer.size(); } - String msg = String.format("response code: [%d] error :[%s]", jestResult.getResponseCode(), jestResult.getErrorMessage()); + String msg = String.format("response code: [%d] error :[%s]", jestResult.getResponseCode(), + jestResult.getErrorMessage()); log.warn(msg); if (esClient.isBulkResult(jestResult)) { BulkResult brst = (BulkResult) jestResult; @@ -423,11 +406,13 @@ public Integer call() throws Exception { for (BulkResult.BulkResultItem item : failedItems) { if (item.status != 400) { // 400 BAD_REQUEST 如果非数据异常,请求异常,则不允许忽略 - throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, String.format("status:[%d], error: %s", item.status, item.error)); + throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, + String.format("status:[%d], error: %s", item.status, item.error)); } else { // 如果用户选择不忽略解析错误,则抛异常,默认为忽略 if (!Key.isIgnoreParseError(conf)) { - throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, String.format("status:[%d], error: %s, config not ignoreParseError so throw this error", item.status, item.error)); + throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, + String.format("status:[%d], error: %s, config not ignoreParseError " + "so" + " throw this error", item.status, item.error)); } } } @@ -436,7 +421,8 @@ public Integer call() throws Exception { for (int idx = 0; idx < items.size(); ++idx) { BulkResult.BulkResultItem item = items.get(idx); if (item.error != null && !"".equals(item.error)) { - getTaskPluginCollector().collectDirtyRecord(writerBuffer.get(idx), String.format("status:[%d], error: %s", item.status, item.error)); + getTaskPluginCollector().collectDirtyRecord(writerBuffer.get(idx), String.format( + "status:[%d], error: %s", item.status, item.error)); } } return writerBuffer.size() - brst.getFailedItems().size(); @@ -447,7 +433,8 @@ public Integer call() throws Exception { log.warn("server response too many requests, so auto reduce speed"); break; } - throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, jestResult.getErrorMessage()); + throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_INSERT, + jestResult.getErrorMessage()); } } }, trySize, 60000L, true); diff --git a/install.sh b/install.sh index cd7527109e..2c17ca3f78 100644 --- a/install.sh +++ b/install.sh @@ -1,4 +1,4 @@ -mvn clean install -Dmaven.test.skip=true \ +mvn clean deploy -Dmaven.test.skip=true \ -Ptis-repo \ -pl \ tis-datax-executor\