From 58b7bf2123c5f89f479dd2b44e776601bd3bfc15 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Thu, 9 Feb 2023 14:25:55 +0800 Subject: [PATCH] remove hive dependency from hdfswriter --- hdfswriter/pom.xml | 50 +-- .../writer/hdfswriter/FileFormatUtils.java | 308 ------------------ .../plugin/writer/hdfswriter/HdfsWriter.java | 13 +- 3 files changed, 36 insertions(+), 335 deletions(-) delete mode 100644 hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/FileFormatUtils.java diff --git a/hdfswriter/pom.xml b/hdfswriter/pom.xml index 428067c509..71d1a923c0 100644 --- a/hdfswriter/pom.xml +++ b/hdfswriter/pom.xml @@ -60,31 +60,31 @@ ${hadoop.version} - - org.apache.hive - hive-exec - ${hive.version} - - - org.apache.hive - hive-serde - ${hive.version} - - - org.apache.hive - hive-service - ${hive.version} - - - org.apache.hive - hive-common - ${hive.version} - - - org.apache.hive.hcatalog - hive-hcatalog-core - ${hive.version} - + + + + + + + + + + + + + + + + + + + + + + + + + com.alibaba.datax diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/FileFormatUtils.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/FileFormatUtils.java deleted file mode 100644 index 5ed9e5f3ad..0000000000 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/FileFormatUtils.java +++ /dev/null @@ -1,308 +0,0 @@ -package com.alibaba.datax.plugin.writer.hdfswriter; - -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.plugin.RecordReceiver; -import com.alibaba.datax.common.plugin.TaskPluginCollector; -import com.alibaba.datax.common.util.Configuration; -import com.google.common.collect.Lists; -import com.qlangtech.tis.plugin.ds.DataType; -import com.qlangtech.tis.plugin.ds.IColMetaGetter; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.*; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @author: 百岁(baisui@qlangtech.com) - * @create: 2022-03-11 14:15 - **/ -public class FileFormatUtils { - /** - * 写orcfile类型文件 - * - * @param lineReceiver - * @param config - * @param fileName - * @param taskPluginCollector - */ - public static void orcFileStartWrite( - FileSystem fileSystem, JobConf conf, RecordReceiver lineReceiver, Configuration config, String fileName, - TaskPluginCollector taskPluginCollector) { - List colsMeta = HdfsColMeta.getColsMeta(config); - // List columns = config.getListConfiguration(Key.COLUMN); - String compress = config.getString(Key.COMPRESS, null); - List columnNames = colsMeta.stream().map((c) -> c.getName()).collect(Collectors.toList()); - List columnTypeInspectors = getColumnTypeInspectors(colsMeta); - StructObjectInspector inspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columnNames, columnTypeInspectors); - - OrcSerde orcSerde = new OrcSerde(); - - FileOutputFormat outFormat = new OrcOutputFormat(); - if (!"NONE".equalsIgnoreCase(compress) && null != compress) { - Class codecClass = getCompressCodec(compress); - if (null != codecClass) { - outFormat.setOutputCompressorClass(conf, codecClass); - } - } - try { - RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL); - Record record = null; - while ((record = lineReceiver.getFromReader()) != null) { - MutablePair, Boolean> transportResult = transportOneRecord(record, colsMeta, taskPluginCollector); - if (!transportResult.getRight()) { - writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector)); - } - } - writer.close(Reporter.NULL); - } catch (Exception e) { - String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName); - HdfsHelper.LOG.error(message); - Path path = new Path(fileName); - HdfsHelper.deleteDir(fileSystem,path.getParent()); - throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); - } - } - - public static MutablePair, Boolean> transportOneRecord( - Record record, List columnsConfiguration, - TaskPluginCollector taskPluginCollector) { - - MutablePair, Boolean> transportResult = new MutablePair, Boolean>(); - transportResult.setRight(false); - List recordList = Lists.newArrayList(); - int recordLength = record.getColumnNumber(); - IColMetaGetter colMeta = null; - if (0 != recordLength) { - Column column; - for (int i = 0; i < recordLength; i++) { - column = record.getColumn(i); - - //todo as method - if (null != column.getRawData()) { - colMeta = columnsConfiguration.get(i); - String rowData = column.getRawData().toString(); - SupportHiveDataType columnType = DataType.convert2HiveType(colMeta.getType()); - //根据writer端类型配置做类型转换 - try { - switch (columnType) { - case TINYINT: - recordList.add(Byte.valueOf(rowData)); - break; - case SMALLINT: - recordList.add(Short.valueOf(rowData)); - break; - case INT: - recordList.add(Integer.valueOf(rowData)); - break; - case BIGINT: - recordList.add(column.asLong()); - break; - case FLOAT: - recordList.add(Float.valueOf(rowData)); - break; - case DOUBLE: - recordList.add(column.asDouble()); - break; - case STRING: - case VARCHAR: - case CHAR: - recordList.add(column.asString()); - break; - case BOOLEAN: - recordList.add(column.asBoolean()); - break; - case DATE: - recordList.add(new java.sql.Date(column.asDate().getTime())); - break; - case TIMESTAMP: - recordList.add(new java.sql.Timestamp(column.asDate().getTime())); - break; - default: - throw DataXException - .asDataXException( - HdfsWriterErrorCode.ILLEGAL_VALUE, - String.format( - "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.", - colMeta.getName(), - colMeta.getType())); - } - } catch (Exception e) { - // warn: 此处认为脏数据 - String message = String.format( - "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].", - colMeta.getType(), column.getRawData().toString()); - taskPluginCollector.collectDirtyRecord(record, message); - transportResult.setRight(true); - break; - } - } else { - // warn: it's all ok if nullFormat is null - recordList.add(null); - } - } - } - transportResult.setLeft(recordList); - return transportResult; - } - - /** - * 根据writer配置的字段类型,构建inspector - * - * @param - * @return - */ - public static List getColumnTypeInspectors(List colsMeta) { - List columnTypeInspectors = Lists.newArrayList(); - for (IColMetaGetter eachColumnConf : colsMeta) { - SupportHiveDataType columnType = DataType.convert2HiveType(eachColumnConf.getType());//SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase()); - ObjectInspector objectInspector = null; - switch (columnType) { - case TINYINT: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case SMALLINT: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Short.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case INT: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case BIGINT: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case FLOAT: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Float.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case DOUBLE: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Double.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case TIMESTAMP: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Timestamp.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case DATE: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Date.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case STRING: - case VARCHAR: - case CHAR: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(String.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - case BOOLEAN: - objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Boolean.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - break; - default: - throw DataXException - .asDataXException( - HdfsWriterErrorCode.ILLEGAL_VALUE, - String.format( - "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.", - eachColumnConf.getName(), - eachColumnConf.getType())); - } - - columnTypeInspectors.add(objectInspector); - } - return columnTypeInspectors; - } - - /** - * 写textfile类型文件 - * - * @param lineReceiver - * @param config - * @param fileName - * @param taskPluginCollector - */ - public static void textFileStartWrite(HdfsHelper fsHelper,RecordReceiver lineReceiver, Configuration config, String fileName, - TaskPluginCollector taskPluginCollector) { - char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER); - // List columns = config.getListConfiguration(Key.COLUMN); - List colsMeta = HdfsColMeta.getColsMeta(config); - String compress = config.getString(Key.COMPRESS, null); - - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); - String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0"; - Path outputPath = new Path(fileName); - //todo 需要进一步确定TASK_ATTEMPT_ID - fsHelper.conf.set(JobContext.TASK_ATTEMPT_ID, attempt); - FileOutputFormat outFormat = new TextOutputFormat(); - outFormat.setOutputPath(fsHelper.conf, outputPath); - outFormat.setWorkOutputPath(fsHelper.conf, outputPath); - if (null != compress) { - Class codecClass = getCompressCodec(compress); - if (null != codecClass) { - outFormat.setOutputCompressorClass(fsHelper.conf, codecClass); - } - } - try { - RecordWriter writer = outFormat.getRecordWriter(fsHelper.fileSystem, fsHelper.conf, outputPath.toString(), Reporter.NULL); - Record record = null; - while ((record = lineReceiver.getFromReader()) != null) { - MutablePair transportResult - = transportOneRecord(record, fieldDelimiter, colsMeta, taskPluginCollector); - if (!transportResult.getRight()) { - writer.write(NullWritable.get(), transportResult.getLeft()); - } - } - writer.close(Reporter.NULL); - } catch (Exception e) { - String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName); - HdfsHelper.LOG.error(message); - Path path = new Path(fileName); - HdfsHelper.deleteDir(fsHelper.fileSystem, path.getParent()); - throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); - } - } - - public static MutablePair transportOneRecord( - Record record, char fieldDelimiter - , List columnsConfiguration, TaskPluginCollector taskPluginCollector) { - MutablePair, Boolean> transportResultList - = transportOneRecord(record, columnsConfiguration, taskPluginCollector); - //保存<转换后的数据,是否是脏数据> - MutablePair transportResult = new MutablePair(); - transportResult.setRight(false); - if (null != transportResultList) { - Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter)); - transportResult.setRight(transportResultList.getRight()); - transportResult.setLeft(recordResult); - } - return transportResult; - } - - public static Class getCompressCodec(String compress) { - Class codecClass = null; - if (null == compress) { - codecClass = null; - } else if ("GZIP".equalsIgnoreCase(compress)) { - codecClass = org.apache.hadoop.io.compress.GzipCodec.class; - } else if ("BZIP2".equalsIgnoreCase(compress)) { - codecClass = org.apache.hadoop.io.compress.BZip2Codec.class; - } else if ("SNAPPY".equalsIgnoreCase(compress)) { - //todo 等需求明确后支持 需要用户安装SnappyCodec - codecClass = org.apache.hadoop.io.compress.SnappyCodec.class; - // org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class not public - //codecClass = org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class; - } else { - throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, - String.format("目前不支持您配置的 compress 模式 : [%s]", compress)); - } - return codecClass; - } -} diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 2117b5d639..51028520b2 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -11,7 +11,9 @@ import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -396,11 +398,11 @@ public void startWrite(RecordReceiver lineReceiver) { LOG.info(String.format("write to file : [%s]", this.fileName)); if (fileType.equalsIgnoreCase("TEXT")) { //写TEXT FILE - FileFormatUtils.textFileStartWrite(hdfsHelper, lineReceiver, this.writerSliceConfig, this.fileName, + startTextWrite(hdfsHelper, lineReceiver, this.writerSliceConfig, this.fileName, this.getTaskPluginCollector()); } else if (fileType.equalsIgnoreCase("ORC")) { //写ORC FILE - FileFormatUtils.orcFileStartWrite(hdfsHelper.fileSystem, hdfsHelper.conf, lineReceiver, this.writerSliceConfig, this.fileName, + orcFileStartWrite(hdfsHelper.fileSystem, hdfsHelper.conf, lineReceiver, this.writerSliceConfig, this.fileName, this.getTaskPluginCollector()); } else if (fileType.equalsIgnoreCase("CSV")) { this.csvFileStartWrite(lineReceiver @@ -415,6 +417,13 @@ public void startWrite(RecordReceiver lineReceiver) { LOG.info("end do write"); } + protected abstract void orcFileStartWrite( + FileSystem fileSystem, JobConf conf, RecordReceiver lineReceiver, Configuration config, String fileName, + TaskPluginCollector taskPluginCollector); + + protected abstract void startTextWrite(HdfsHelper fsHelper, RecordReceiver lineReceiver, Configuration config, String fileName, + TaskPluginCollector taskPluginCollector); + protected abstract void avroFileStartWrite( RecordReceiver lineReceiver, Configuration writerSliceConfig, String fileName, TaskPluginCollector taskPluginCollector);