emptyList(), String.class);
+ StringCast.timeZone = configuration.getString("common.column.timeZone",
+ StringCast.timeZone);
+ StringCast.timeZoner = TimeZone.getTimeZone(StringCast.timeZone);
+ StringCast.datetimeFormatter = FastDateFormat.getInstance(
+ StringCast.datetimeFormat, StringCast.timeZoner);
+ StringCast.dateFormatter = FastDateFormat.getInstance(
+ StringCast.dateFormat, StringCast.timeZoner);
+ StringCast.timeFormatter = FastDateFormat.getInstance(
+ StringCast.timeFormat, StringCast.timeZoner);
+ StringCast.encoding = configuration.getString("common.column.encoding",
+ StringCast.encoding);
+ }
+ static Date asDate(final StringColumn column) throws ParseException {
+ if (null == column.asString()) {
+ return null;
+ }
+ try {
+ return StringCast.datetimeFormatter.parse(column.asString());
+ } catch (ParseException ignored) {
+ }
+ try {
+ return StringCast.dateFormatter.parse(column.asString());
+ } catch (ParseException ignored) {
+ }
+ ParseException e;
+ try {
+ return StringCast.timeFormatter.parse(column.asString());
+ } catch (ParseException ignored) {
+ e = ignored;
+ }
+ for (String format : StringCast.extraFormats) {
+ try {
+ return FastDateFormat.getInstance(format, StringCast.timeZoner).parse(column.asString());
+ } catch (ParseException ignored) {
+ e = ignored;
+ }
+ }
+ throw e;
+ }
+ static Date asDate(final StringColumn column, String dateFormat) throws ParseException {
+ ParseException e;
+ try {
+ return FastDateFormat.getInstance(dateFormat, StringCast.timeZoner).parse(column.asString());
+ } catch (ParseException ignored) {
+ e = ignored;
+ }
+ throw e;
+ }
+ static byte[] asBytes(final StringColumn column)
+ throws UnsupportedEncodingException {
+ if (null == column.asString()) {
+ return null;
+ }
+ return column.asString().getBytes(StringCast.encoding);
+ }
+ * 后续为了可维护性,可以考虑直接使用 apache 的DateFormatUtils.
+ *
+ * 迟南已经修复了该问题,但是为了维护性,还是直接使用apache的内置函数
+ */
+class DateCast {
+ static String datetimeFormat = "yyyy-MM-dd HH:mm:ss";
+ static String dateFormat = "yyyy-MM-dd";
+ static String timeFormat = "HH:mm:ss";
+ static String timeZone = "GMT+8";
+ static TimeZone timeZoner = TimeZone.getTimeZone(DateCast.timeZone);
+ static void init(final Configuration configuration) {
+ DateCast.datetimeFormat = configuration.getString(
+ "common.column.datetimeFormat", datetimeFormat);
+ DateCast.timeFormat = configuration.getString(
+ "common.column.timeFormat", timeFormat);
+ DateCast.dateFormat = configuration.getString(
+ "common.column.dateFormat", dateFormat);
+ DateCast.timeZone = configuration.getString("common.column.timeZone",
+ DateCast.timeZone);
+ DateCast.timeZoner = TimeZone.getTimeZone(DateCast.timeZone);
+ return;
+ }
+ static String asString(final DateColumn column) {
+ if (null == column.asDate()) {
+ return null;
+ }
+ switch (column.getSubType()) {
+ case DATE:
+ return DateFormatUtils.format(column.asDate(), DateCast.dateFormat,
+ DateCast.timeZoner);
+ case TIME:
+ return DateFormatUtils.format(column.asDate(), DateCast.timeFormat,
+ DateCast.timeZoner);
+ case DATETIME:
+ return DateFormatUtils.format(column.asDate(),
+ DateCast.datetimeFormat, DateCast.timeZoner);
+ default:
+ throw DataXException
+ .asDataXException(CommonErrorCode.CONVERT_NOT_SUPPORT,
+ "时间类型出现不支持类型,目前仅支持DATE/TIME/DATETIME。该类型属于编程错误,请反馈给DataX开发团队 .");
+ }
+ }
+class BytesCast {
+ static String encoding = "utf-8";
+ static void init(final Configuration configuration) {
+ BytesCast.encoding = configuration.getString("common.column.encoding",
+ BytesCast.encoding);
+ return;
+ }
+ static String asString(final BytesColumn column)
+ throws UnsupportedEncodingException {
+ if (null == column.asBytes()) {
+ return null;
+ }
+ return new String(column.asBytes(), encoding);
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/DateColumn.java b/common-scala/src/main/java/com/alibaba/datax/common/element/DateColumn.java
new file mode 100755
index 0000000000..404332f460
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/DateColumn.java
@@ -0,0 +1,135 @@
+package com.alibaba.datax.common.element;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+ * Created by jingxing on 14-8-24.
+ */
+public class DateColumn extends Column {
+ private DateType subType = DateType.DATETIME;
+ public static enum DateType {
+ }
+ /**
+ * 构建值为null的DateColumn,使用Date子类型为DATETIME
+ * */
+ public DateColumn() {
+ this((Long)null);
+ }
+ /**
+ * 构建值为stamp(Unix时间戳)的DateColumn,使用Date子类型为DATETIME
+ * 实际存储有date改为long的ms,节省存储
+ * */
+ public DateColumn(final Long stamp) {
+ super(stamp, Column.Type.DATE, (null == stamp ? 0 : 8));
+ }
+ /**
+ * 构建值为date(java.util.Date)的DateColumn,使用Date子类型为DATETIME
+ * */
+ public DateColumn(final Date date) {
+ this(date == null ? null : date.getTime());
+ }
+ /**
+ * 构建值为date(java.sql.Date)的DateColumn,使用Date子类型为DATE,只有日期,没有时间
+ * */
+ public DateColumn(final java.sql.Date date) {
+ this(date == null ? null : date.getTime());
+ this.setSubType(DateType.DATE);
+ }
+ /**
+ * 构建值为time(java.sql.Time)的DateColumn,使用Date子类型为TIME,只有时间,没有日期
+ * */
+ public DateColumn(final java.sql.Time time) {
+ this(time == null ? null : time.getTime());
+ this.setSubType(DateType.TIME);
+ }
+ /**
+ * 构建值为ts(java.sql.Timestamp)的DateColumn,使用Date子类型为DATETIME
+ * */
+ public DateColumn(final java.sql.Timestamp ts) {
+ this(ts == null ? null : ts.getTime());
+ this.setSubType(DateType.DATETIME);
+ }
+ @Override
+ public Long asLong() {
+ return (Long)this.getRawData();
+ }
+ @Override
+ public String asString() {
+ try {
+ return ColumnCast.date2String(this);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("Date[%s]类型不能转为String .", this.toString()));
+ }
+ }
+ @Override
+ public Date asDate() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return new Date((Long)this.getRawData());
+ }
+ @Override
+ public Date asDate(String dateFormat) {
+ return asDate();
+ }
+ @Override
+ public byte[] asBytes() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Date类型不能转为Bytes .");
+ }
+ @Override
+ public Boolean asBoolean() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Date类型不能转为Boolean .");
+ }
+ @Override
+ public Double asDouble() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Date类型不能转为Double .");
+ }
+ @Override
+ public BigInteger asBigInteger() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Date类型不能转为BigInteger .");
+ }
+ @Override
+ public BigDecimal asBigDecimal() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Date类型不能转为BigDecimal .");
+ }
+ public DateType getSubType() {
+ return subType;
+ }
+ public void setSubType(DateType subType) {
+ this.subType = subType;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/DoubleColumn.java b/common-scala/src/main/java/com/alibaba/datax/common/element/DoubleColumn.java
new file mode 100755
index 0000000000..217f4d4b13
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/DoubleColumn.java
@@ -0,0 +1,167 @@
+package com.alibaba.datax.common.element;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+public class DoubleColumn extends Column {
+ public DoubleColumn(final String data) {
+ this(data, null == data ? 0 : data.length());
+ this.validate(data);
+ }
+ public DoubleColumn(Long data) {
+ this(data == null ? (String) null : String.valueOf(data));
+ }
+ public DoubleColumn(Integer data) {
+ this(data == null ? (String) null : String.valueOf(data));
+ }
+ /**
+ * Double无法表示准确的小数数据,我们不推荐使用该方法保存Double数据,建议使用String作为构造入参
+ *
+ * */
+ public DoubleColumn(final Double data) {
+ this(data == null ? (String) null
+ : new BigDecimal(String.valueOf(data)).toPlainString());
+ }
+ /**
+ * Float无法表示准确的小数数据,我们不推荐使用该方法保存Float数据,建议使用String作为构造入参
+ *
+ * */
+ public DoubleColumn(final Float data) {
+ this(data == null ? (String) null
+ : new BigDecimal(String.valueOf(data)).toPlainString());
+ }
+ public DoubleColumn(final BigDecimal data) {
+ this(null == data ? (String) null : data.toPlainString());
+ }
+ public DoubleColumn(final BigInteger data) {
+ this(null == data ? (String) null : data.toString());
+ }
+ public DoubleColumn() {
+ this((String) null);
+ }
+ private DoubleColumn(final String data, int byteSize) {
+ super(data, Column.Type.DOUBLE, byteSize);
+ }
+ @Override
+ public BigDecimal asBigDecimal() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ try {
+ return new BigDecimal((String) this.getRawData());
+ } catch (NumberFormatException e) {
+ throw DataXException.asDataXException(
+ String.format("String[%s] 无法转换为Double类型 .",
+ (String) this.getRawData()));
+ }
+ }
+ @Override
+ public Double asDouble() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ String string = (String) this.getRawData();
+ boolean isDoubleSpecific = string.equals("NaN")
+ || string.equals("-Infinity") || string.equals("+Infinity");
+ if (isDoubleSpecific) {
+ return Double.valueOf(string);
+ }
+ BigDecimal result = this.asBigDecimal();
+ OverFlowUtil.validateDoubleNotOverFlow(result);
+ return result.doubleValue();
+ }
+ @Override
+ public Long asLong() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ BigDecimal result = this.asBigDecimal();
+ OverFlowUtil.validateLongNotOverFlow(result.toBigInteger());
+ return result.longValue();
+ }
+ @Override
+ public BigInteger asBigInteger() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return this.asBigDecimal().toBigInteger();
+ }
+ @Override
+ public String asString() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return (String) this.getRawData();
+ }
+ @Override
+ public Boolean asBoolean() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Double类型无法转为Bool .");
+ }
+ @Override
+ public Date asDate() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Double类型无法转为Date类型 .");
+ }
+ @Override
+ public Date asDate(String dateFormat) {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Double类型无法转为Date类型 .");
+ }
+ @Override
+ public byte[] asBytes() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Double类型无法转为Bytes类型 .");
+ }
+ private void validate(final String data) {
+ if (null == data) {
+ return;
+ }
+ if (data.equalsIgnoreCase("NaN") || data.equalsIgnoreCase("-Infinity")
+ || data.equalsIgnoreCase("Infinity")) {
+ return;
+ }
+ try {
+ new BigDecimal(data);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("String[%s]无法转为Double类型 .", data));
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/LongColumn.java b/common-scala/src/main/java/com/alibaba/datax/common/element/LongColumn.java
new file mode 100755
index 0000000000..0b6f1e4867
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/LongColumn.java
@@ -0,0 +1,140 @@
+package com.alibaba.datax.common.element;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import org.apache.commons.lang3.math.NumberUtils;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+public class LongColumn extends Column {
+ /**
+ * 从整形字符串表示转为LongColumn,支持Java科学计数法
+ *
+ * NOTE:
+ * 如果data为浮点类型的字符串表示,数据将会失真,请使用DoubleColumn对接浮点字符串
+ *
+ * */
+ public LongColumn(final String data) {
+ super(null, Column.Type.LONG, 0);
+ if (null == data) {
+ return;
+ }
+ try {
+ BigInteger rawData = NumberUtils.createBigDecimal(data)
+ .toBigInteger();
+ super.setRawData(rawData);
+ // 当 rawData 为[0-127]时,rawData.bitLength() < 8,导致其 byteSize = 0,简单起见,直接认为其长度为 data.length()
+ // super.setByteSize(rawData.bitLength() / 8);
+ super.setByteSize(data.length());
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("String[%s]不能转为Long .", data));
+ }
+ }
+ public LongColumn(Long data) {
+ this(null == data ? (BigInteger) null : BigInteger.valueOf(data));
+ }
+ public LongColumn(Integer data) {
+ this(null == data ? (BigInteger) null : BigInteger.valueOf(data));
+ }
+ public LongColumn(BigInteger data) {
+ this(data, null == data ? 0 : 8);
+ }
+ private LongColumn(BigInteger data, int byteSize) {
+ super(data, Column.Type.LONG, byteSize);
+ }
+ public LongColumn() {
+ this((BigInteger) null);
+ }
+ @Override
+ public BigInteger asBigInteger() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return (BigInteger) this.getRawData();
+ }
+ @Override
+ public Long asLong() {
+ BigInteger rawData = (BigInteger) this.getRawData();
+ if (null == rawData) {
+ return null;
+ }
+ OverFlowUtil.validateLongNotOverFlow(rawData);
+ return rawData.longValue();
+ }
+ @Override
+ public Double asDouble() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ BigDecimal decimal = this.asBigDecimal();
+ OverFlowUtil.validateDoubleNotOverFlow(decimal);
+ return decimal.doubleValue();
+ }
+ @Override
+ public Boolean asBoolean() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return this.asBigInteger().compareTo(BigInteger.ZERO) != 0 ? true
+ : false;
+ }
+ @Override
+ public BigDecimal asBigDecimal() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return new BigDecimal(this.asBigInteger());
+ }
+ @Override
+ public String asString() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return ((BigInteger) this.getRawData()).toString();
+ }
+ @Override
+ public Date asDate() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return new Date(this.asLong());
+ }
+ @Override
+ public Date asDate(String dateFormat) {
+ return this.asDate();
+ }
+ @Override
+ public byte[] asBytes() {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, "Long类型不能转为Bytes .");
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/OverFlowUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/element/OverFlowUtil.java
new file mode 100755
index 0000000000..39460c7ebc
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/OverFlowUtil.java
@@ -0,0 +1,62 @@
+package com.alibaba.datax.common.element;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+public final class OverFlowUtil {
+ public static final BigInteger MAX_LONG = BigInteger
+ .valueOf(Long.MAX_VALUE);
+ public static final BigInteger MIN_LONG = BigInteger
+ .valueOf(Long.MIN_VALUE);
+ public static final BigDecimal MIN_DOUBLE_POSITIVE = new BigDecimal(
+ String.valueOf(Double.MIN_VALUE));
+ public static final BigDecimal MAX_DOUBLE_POSITIVE = new BigDecimal(
+ String.valueOf(Double.MAX_VALUE));
+ public static boolean isLongOverflow(final BigInteger integer) {
+ return (integer.compareTo(OverFlowUtil.MAX_LONG) > 0 || integer
+ .compareTo(OverFlowUtil.MIN_LONG) < 0);
+ }
+ public static void validateLongNotOverFlow(final BigInteger integer) {
+ boolean isOverFlow = OverFlowUtil.isLongOverflow(integer);
+ if (isOverFlow) {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_OVER_FLOW,
+ String.format("[%s] 转为Long类型出现溢出 .", integer.toString()));
+ }
+ }
+ public static boolean isDoubleOverFlow(final BigDecimal decimal) {
+ if (decimal.signum() == 0) {
+ return false;
+ }
+ BigDecimal newDecimal = decimal;
+ boolean isPositive = decimal.signum() == 1;
+ if (!isPositive) {
+ newDecimal = decimal.negate();
+ }
+ return (newDecimal.compareTo(MIN_DOUBLE_POSITIVE) < 0 || newDecimal
+ .compareTo(MAX_DOUBLE_POSITIVE) > 0);
+ }
+ public static void validateDoubleNotOverFlow(final BigDecimal decimal) {
+ boolean isOverFlow = OverFlowUtil.isDoubleOverFlow(decimal);
+ if (isOverFlow) {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_OVER_FLOW,
+ String.format("[%s]转为Double类型出现溢出 .",
+ decimal.toPlainString()));
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/Record.java b/common-scala/src/main/java/com/alibaba/datax/common/element/Record.java
new file mode 100755
index 0000000000..d06d80aafb
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/Record.java
@@ -0,0 +1,23 @@
+package com.alibaba.datax.common.element;
+ * Created by jingxing on 14-8-24.
+ */
+public interface Record {
+ public void addColumn(Column column);
+ public void setColumn(int i, final Column column);
+ public Column getColumn(int i);
+ public String toString();
+ public int getColumnNumber();
+ public int getByteSize();
+ public int getMemorySize();
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/element/StringColumn.java b/common-scala/src/main/java/com/alibaba/datax/common/element/StringColumn.java
new file mode 100755
index 0000000000..7fc68cc7b5
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/element/StringColumn.java
@@ -0,0 +1,173 @@
+package com.alibaba.datax.common.element;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+ * Created by jingxing on 14-8-24.
+ */
+public class StringColumn extends Column {
+ public StringColumn() {
+ this((String) null);
+ }
+ public StringColumn(final String rawData) {
+ super(rawData, Column.Type.STRING, (null == rawData ? 0 : rawData
+ .length()));
+ }
+ @Override
+ public String asString() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ return (String) this.getRawData();
+ }
+ private void validateDoubleSpecific(final String data) {
+ if ("NaN".equals(data) || "Infinity".equals(data)
+ || "-Infinity".equals(data)) {
+ throw DataXException.asDataXException(
+ String.format("String[\"%s\"]属于Double特殊类型,不能转为其他类型 .", data));
+ }
+ return;
+ }
+ @Override
+ public BigInteger asBigInteger() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ this.validateDoubleSpecific((String) this.getRawData());
+ try {
+ return this.asBigDecimal().toBigInteger();
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
+ "String[\"%s\"]不能转为BigInteger .", this.asString()));
+ }
+ }
+ @Override
+ public Long asLong() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ this.validateDoubleSpecific((String) this.getRawData());
+ try {
+ BigInteger integer = this.asBigInteger();
+ OverFlowUtil.validateLongNotOverFlow(integer);
+ return integer.longValue();
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("String[\"%s\"]不能转为Long .", this.asString()));
+ }
+ }
+ @Override
+ public BigDecimal asBigDecimal() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ this.validateDoubleSpecific((String) this.getRawData());
+ try {
+ return new BigDecimal(this.asString());
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ CommonErrorCode.CONVERT_NOT_SUPPORT, String.format(
+ "String [\"%s\"] 不能转为BigDecimal .", this.asString()));
+ }
+ }
+ @Override
+ public Double asDouble() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ String data = (String) this.getRawData();
+ if ("NaN".equals(data)) {
+ return Double.NaN;
+ }
+ if ("Infinity".equals(data)) {
+ return Double.POSITIVE_INFINITY;
+ }
+ if ("-Infinity".equals(data)) {
+ return Double.NEGATIVE_INFINITY;
+ }
+ BigDecimal decimal = this.asBigDecimal();
+ OverFlowUtil.validateDoubleNotOverFlow(decimal);
+ return decimal.doubleValue();
+ }
+ @Override
+ public Boolean asBoolean() {
+ if (null == this.getRawData()) {
+ return null;
+ }
+ if ("true".equalsIgnoreCase(this.asString())) {
+ return true;
+ }
+ if ("false".equalsIgnoreCase(this.asString())) {
+ return false;
+ }
+ throw DataXException.asDataXException(
+ String.format("String[\"%s\"]不能转为Bool .", this.asString()));
+ }
+ @Override
+ public Date asDate() {
+ try {
+ return ColumnCast.string2Date(this);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("String[\"%s\"]不能转为Date .", this.asString()));
+ }
+ }
+ @Override
+ public Date asDate(String dateFormat) {
+ try {
+ return ColumnCast.string2Date(this, dateFormat);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(CommonErrorCode.CONVERT_NOT_SUPPORT,
+ String.format("String[\"%s\"]不能转为Date .", this.asString()));
+ }
+ }
+ @Override
+ public byte[] asBytes() {
+ try {
+ return ColumnCast.string2Bytes(this);
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ String.format("String[\"%s\"]不能转为Bytes .", this.asString()));
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/exception/DataXException.java b/common-scala/src/main/java/com/alibaba/datax/common/exception/DataXException.java
new file mode 100755
index 0000000000..09d00adcf1
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/exception/DataXException.java
@@ -0,0 +1,70 @@
+package com.alibaba.datax.common.exception;
+import com.alibaba.datax.common.spi.ErrorCode;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+public class DataXException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ private ErrorCode errorCode;
+ public DataXException(ErrorCode errorCode, String errorMessage) {
+ super(errorCode.toString() + " - " + errorMessage);
+ this.errorCode = errorCode;
+ }
+ public DataXException(String errorMessage) {
+ super(errorMessage);
+ }
+ private DataXException(ErrorCode errorCode, String errorMessage, Throwable cause) {
+ super(errorCode.toString() + " - " + getMessage(errorMessage) + " - " + getMessage(cause), cause);
+ this.errorCode = errorCode;
+ }
+ public static DataXException asDataXException(ErrorCode errorCode, String message) {
+ return new DataXException(errorCode, message);
+ }
+ public static DataXException asDataXException(String message) {
+ return new DataXException(message);
+ }
+ public static DataXException asDataXException(ErrorCode errorCode, String message, Throwable cause) {
+ if (cause instanceof DataXException) {
+ return (DataXException) cause;
+ }
+ return new DataXException(errorCode, message, cause);
+ }
+ public static DataXException asDataXException(ErrorCode errorCode, Throwable cause) {
+ if (cause instanceof DataXException) {
+ return (DataXException) cause;
+ }
+ return new DataXException(errorCode, getMessage(cause), cause);
+ }
+ public ErrorCode getErrorCode() {
+ return this.errorCode;
+ }
+ private static String getMessage(Object obj) {
+ if (obj == null) {
+ return "";
+ }
+ if (obj instanceof Throwable) {
+ StringWriter str = new StringWriter();
+ PrintWriter pw = new PrintWriter(str);
+ ((Throwable) obj).printStackTrace(pw);
+ return str.toString();
+ // return ((Throwable) obj).getMessage();
+ } else {
+ return obj.toString();
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/exception/ExceptionTracker.java b/common-scala/src/main/java/com/alibaba/datax/common/exception/ExceptionTracker.java
new file mode 100644
index 0000000000..f6d3732e2a
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/exception/ExceptionTracker.java
@@ -0,0 +1,15 @@
+package com.alibaba.datax.common.exception;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+public final class ExceptionTracker {
+ public static final int STRING_BUFFER = 1024;
+ public static String trace(Throwable ex) {
+ StringWriter sw = new StringWriter(STRING_BUFFER);
+ PrintWriter pw = new PrintWriter(sw);
+ ex.printStackTrace(pw);
+ return sw.toString();
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractJobPlugin.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractJobPlugin.java
new file mode 100755
index 0000000000..ef8eb509f0
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractJobPlugin.java
@@ -0,0 +1,26 @@
+package com.alibaba.datax.common.plugin;
+ * Created by jingxing on 14-8-24.
+ */
+public abstract class AbstractJobPlugin extends AbstractPlugin {
+ /**
+ * @return the jobPluginCollector
+ */
+ public JobPluginCollector getJobPluginCollector() {
+ return jobPluginCollector;
+ }
+ /**
+ * @param jobPluginCollector the jobPluginCollector to set
+ */
+ public void setJobPluginCollector(
+ JobPluginCollector jobPluginCollector) {
+ this.jobPluginCollector = jobPluginCollector;
+ }
+ private JobPluginCollector jobPluginCollector;
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractPlugin.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractPlugin.java
new file mode 100755
index 0000000000..ef375c994f
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractPlugin.java
@@ -0,0 +1,97 @@
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.common.base.BaseObject;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.core.job.IJobContainerContext;
+public abstract class AbstractPlugin extends BaseObject implements Pluginable {
+ //作业的config
+ private Configuration pluginJobConf;
+ //插件本身的plugin
+ private Configuration pluginConf;
+ // by qiangsi.lq。 修改为对端的作业configuration
+ private Configuration peerPluginJobConf;
+ private String peerPluginName;
+ protected IJobContainerContext containerContext;
+ @Override
+ public final void setContainerContext(IJobContainerContext containerContext) {
+ this.containerContext = containerContext;
+ }
+ @Override
+ public String getPluginName() {
+ assert null != this.pluginConf;
+ return this.pluginConf.getString("name");
+ }
+ @Override
+ public String getDeveloper() {
+ assert null != this.pluginConf;
+ return this.pluginConf.getString("developer");
+ }
+ @Override
+ public String getDescription() {
+ assert null != this.pluginConf;
+ return this.pluginConf.getString("description");
+ }
+ @Override
+ public Configuration getPluginJobConf() {
+ return pluginJobConf;
+ }
+ @Override
+ public void setPluginJobConf(Configuration pluginJobConf) {
+ this.pluginJobConf = pluginJobConf;
+ }
+ @Override
+ public void setPluginConf(Configuration pluginConf) {
+ this.pluginConf = pluginConf;
+ }
+ @Override
+ public Configuration getPeerPluginJobConf() {
+ return peerPluginJobConf;
+ }
+ @Override
+ public void setPeerPluginJobConf(Configuration peerPluginJobConf) {
+ this.peerPluginJobConf = peerPluginJobConf;
+ }
+ @Override
+ public String getPeerPluginName() {
+ return peerPluginName;
+ }
+ @Override
+ public void setPeerPluginName(String peerPluginName) {
+ this.peerPluginName = peerPluginName;
+ }
+ public void preCheck() {
+ }
+ public void prepare() {
+ }
+ public void post() {
+ }
+ public void preHandler(Configuration jobConfiguration) {
+ }
+ public void postHandler(Configuration jobConfiguration) {
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractTaskPlugin.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractTaskPlugin.java
new file mode 100755
index 0000000000..9a3f54e34a
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/AbstractTaskPlugin.java
@@ -0,0 +1,41 @@
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.core.job.IJobContainerContext;
+ * Created by jingxing on 14-8-24.
+ */
+public abstract class AbstractTaskPlugin extends AbstractPlugin {
+ //TaskPlugin 应该具备taskId
+ private int taskGroupId;
+ private int taskId;
+ private TaskPluginCollector taskPluginCollector;
+ public TaskPluginCollector getTaskPluginCollector() {
+ return taskPluginCollector;
+ }
+ public void setTaskPluginCollector(
+ TaskPluginCollector taskPluginCollector) {
+ this.taskPluginCollector = taskPluginCollector;
+ }
+ public int getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+ public int getTaskGroupId() {
+ return taskGroupId;
+ }
+ public void setTaskGroupId(int taskGroupId) {
+ this.taskGroupId = taskGroupId;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/JobPluginCollector.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/JobPluginCollector.java
new file mode 100755
index 0000000000..6eb02ab4e7
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/JobPluginCollector.java
@@ -0,0 +1,22 @@
+package com.alibaba.datax.common.plugin;
+import java.util.List;
+import java.util.Map;
+ * Created by jingxing on 14-9-9.
+ */
+public interface JobPluginCollector extends PluginCollector {
+ /**
+ * 从Task获取自定义收集信息
+ *
+ * */
+ Map> getMessage();
+ /**
+ * 从Task获取自定义收集信息
+ *
+ * */
+ List getMessage(String key);
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/PluginCollector.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/PluginCollector.java
new file mode 100755
index 0000000000..f2af398dd3
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/PluginCollector.java
@@ -0,0 +1,9 @@
+package com.alibaba.datax.common.plugin;
+ * 这里只是一个标示类
+ * */
+public interface PluginCollector {
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/Pluginable.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/Pluginable.java
new file mode 100755
index 0000000000..69bd34f992
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/Pluginable.java
@@ -0,0 +1,33 @@
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.core.job.IJobContainerContext;
+public interface Pluginable {
+ String getDeveloper();
+ String getDescription();
+ void setPluginConf(Configuration pluginConf);
+ void init();
+ void destroy();
+ String getPluginName();
+ Configuration getPluginJobConf();
+ Configuration getPeerPluginJobConf();
+ public String getPeerPluginName();
+ void setContainerContext(IJobContainerContext containerContext);
+ void setPluginJobConf(Configuration jobConf);
+ void setPeerPluginJobConf(Configuration peerPluginJobConf);
+ public void setPeerPluginName(String peerPluginName);
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordReceiver.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordReceiver.java
new file mode 100755
index 0000000000..74f236f371
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordReceiver.java
@@ -0,0 +1,26 @@
+ * (C) 2010-2013 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.common.element.Record;
+public interface RecordReceiver {
+ public Record getFromReader();
+ public void shutdown();
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java
new file mode 100755
index 0000000000..0d6926098f
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java
@@ -0,0 +1,32 @@
+ * (C) 2010-2013 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.common.element.Record;
+public interface RecordSender {
+ public Record createRecord();
+ public void sendToWriter(Record record);
+ public void flush();
+ public void terminate();
+ public void shutdown();
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/plugin/TaskPluginCollector.java b/common-scala/src/main/java/com/alibaba/datax/common/plugin/TaskPluginCollector.java
new file mode 100755
index 0000000000..f0c85fe6ce
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/plugin/TaskPluginCollector.java
@@ -0,0 +1,57 @@
+package com.alibaba.datax.common.plugin;
+import com.alibaba.datax.common.element.Record;
+ *
+ * 该接口提供给Task Plugin用来记录脏数据和自定义信息。
+ *
+ * 1. 脏数据记录,TaskPluginCollector提供多种脏数据记录的适配,包括本地输出、集中式汇报等等
+ * 2. 自定义信息,所有的task插件运行过程中可以通过TaskPluginCollector收集信息,
+ * Job的插件在POST过程中通过getMessage()接口获取信息
+ */
+public abstract class TaskPluginCollector implements PluginCollector {
+ /**
+ * 收集脏数据
+ *
+ * @param dirtyRecord
+ * 脏数据信息
+ * @param t
+ * 异常信息
+ * @param errorMessage
+ * 错误的提示信息
+ */
+ public abstract void collectDirtyRecord(final Record dirtyRecord,
+ final Throwable t, final String errorMessage);
+ /**
+ * 收集脏数据
+ *
+ * @param dirtyRecord
+ * 脏数据信息
+ * @param errorMessage
+ * 错误的提示信息
+ */
+ public void collectDirtyRecord(final Record dirtyRecord,
+ final String errorMessage) {
+ this.collectDirtyRecord(dirtyRecord, null, errorMessage);
+ }
+ /**
+ * 收集脏数据
+ *
+ * @param dirtyRecord
+ * 脏数据信息
+ * @param t
+ * 异常信息
+ */
+ public void collectDirtyRecord(final Record dirtyRecord, final Throwable t) {
+ this.collectDirtyRecord(dirtyRecord, t, "");
+ }
+ /**
+ * 收集自定义信息,Job插件可以通过getMessage获取该信息
+ * 如果多个key冲突,内部使用List记录同一个key,多个value情况。
+ * */
+ public abstract void collectMessage(final String key, final String value);
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/spi/Hook.java b/common-scala/src/main/java/com/alibaba/datax/common/spi/Hook.java
new file mode 100755
index 0000000000..d510f57c18
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/spi/Hook.java
@@ -0,0 +1,27 @@
+package com.alibaba.datax.common.spi;
+import com.alibaba.datax.common.util.Configuration;
+import java.util.Map;
+ * Created by xiafei.qiuxf on 14/12/17.
+ */
+public interface Hook {
+ /**
+ * 返回名字
+ *
+ * @return
+ */
+ public String getName();
+ /**
+ * TODO 文档
+ *
+ * @param jobConf
+ * @param msg
+ */
+ public void invoke(Configuration jobConf, Map msg);
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/spi/Reader.java b/common-scala/src/main/java/com/alibaba/datax/common/spi/Reader.java
new file mode 100755
index 0000000000..fec41a9f03
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/spi/Reader.java
@@ -0,0 +1,52 @@
+package com.alibaba.datax.common.spi;
+import java.util.List;
+import com.alibaba.datax.common.base.BaseObject;
+import com.alibaba.datax.common.plugin.AbstractJobPlugin;
+import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.plugin.RecordSender;
+ * 每个Reader插件在其内部内部实现Job、Task两个内部类。
+ *
+ *
+ * */
+public abstract class Reader extends BaseObject {
+ /**
+ * 每个Reader插件必须实现Job内部类。
+ *
+ * */
+ public static abstract class Job extends AbstractJobPlugin {
+ /**
+ * 切分任务
+ *
+ * @param adviceNumber
+ *
+ * 着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>=
+ * adviceNumber。
+ *
+ * 之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接,
+ * 并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息,
+ * DataX就可以同时启动100个Channel,这样给用户最好的吞吐量
+ * 例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分,
+ * 并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。
+ *
+ * 当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。
+ *
+ * 对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分,
+ * 这类情况只能按照源头物理信息切分规则切分。
+ *
+ *
+ *
+ * */
+ public abstract List split(int adviceNumber);
+ }
+ public static abstract class Task extends AbstractTaskPlugin {
+ public abstract void startRead(RecordSender recordSender);
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/spi/Writer.java b/common-scala/src/main/java/com/alibaba/datax/common/spi/Writer.java
new file mode 100755
index 0000000000..457eb6860c
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/spi/Writer.java
@@ -0,0 +1,40 @@
+package com.alibaba.datax.common.spi;
+import com.alibaba.datax.common.base.BaseObject;
+import com.alibaba.datax.common.plugin.AbstractJobPlugin;
+import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import java.util.List;
+ * 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。
+ *
+ *
+ * */
+public abstract class Writer extends BaseObject {
+ /**
+ * 每个Writer插件必须实现Job内部类
+ */
+ public abstract static class Job extends AbstractJobPlugin {
+ /**
+ * 切分任务。
+ *
+ * @param mandatoryNumber
+ * 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
+ *
+ * */
+ public abstract List split(int mandatoryNumber);
+ }
+ /**
+ * 每个Writer插件必须实现Task内部类
+ */
+ public abstract static class Task extends AbstractTaskPlugin {
+ public abstract void startWrite(RecordReceiver lineReceiver);
+ public boolean supportFailOver(){return false;}
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java b/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java
new file mode 100644
index 0000000000..74b26eeb60
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfRecord.java
@@ -0,0 +1,258 @@
+package com.alibaba.datax.common.statistics;
+import com.alibaba.datax.common.util.HostUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Date;
+ * Created by liqiang on 15/8/23.
+ */
+public class PerfRecord implements Comparable {
+ private static Logger perf = LoggerFactory.getLogger(PerfRecord.class);
+ private static String datetimeFormat = "yyyy-MM-dd HH:mm:ss";
+ public enum PHASE {
+ /**
+ * task total运行的时间,前10为框架统计,后面为部分插件的个性统计
+ */
+ /**
+ * SQL_QUERY: sql query阶段, 部分reader的个性统计
+ */
+ SQL_QUERY(100),
+ /**
+ * 数据从sql全部读出来
+ */
+ /**
+ * only odps block close
+ */
+ private int val;
+ PHASE(int val) {
+ this.val = val;
+ }
+ public int toInt(){
+ return val;
+ }
+ }
+ public enum ACTION{
+ start,
+ end
+ }
+ private final int taskGroupId;
+ private final int taskId;
+ private final PHASE phase;
+ private volatile ACTION action;
+ private volatile Date startTime;
+ private volatile long elapsedTimeInNs = -1;
+ private volatile long count = 0;
+ private volatile long size = 0;
+ private volatile long startTimeInNs;
+ private volatile boolean isReport = false;
+ public PerfRecord(int taskGroupId, int taskId, PHASE phase) {
+ this.taskGroupId = taskGroupId;
+ this.taskId = taskId;
+ this.phase = phase;
+ }
+ public static void addPerfRecord(int taskGroupId, int taskId, PHASE phase, long startTime,long elapsedTimeInNs) {
+ if(PerfTrace.getInstance().isEnable()) {
+ PerfRecord perfRecord = new PerfRecord(taskGroupId, taskId, phase);
+ perfRecord.elapsedTimeInNs = elapsedTimeInNs;
+ perfRecord.action = ACTION.end;
+ perfRecord.startTime = new Date(startTime);
+ //在PerfTrace里注册
+ PerfTrace.getInstance().tracePerfRecord(perfRecord);
+ perf.info(perfRecord.toString());
+ }
+ }
+ public void start() {
+ if(PerfTrace.getInstance().isEnable()) {
+ this.startTime = new Date();
+ this.startTimeInNs = System.nanoTime();
+ this.action = ACTION.start;
+ //在PerfTrace里注册
+ PerfTrace.getInstance().tracePerfRecord(this);
+ perf.info(toString());
+ }
+ }
+ public void addCount(long count) {
+ this.count += count;
+ }
+ public void addSize(long size) {
+ this.size += size;
+ }
+ public void end() {
+ if(PerfTrace.getInstance().isEnable()) {
+ this.elapsedTimeInNs = System.nanoTime() - startTimeInNs;
+ this.action = ACTION.end;
+ PerfTrace.getInstance().tracePerfRecord(this);
+ perf.info(toString());
+ }
+ }
+ public void end(long elapsedTimeInNs) {
+ if(PerfTrace.getInstance().isEnable()) {
+ this.elapsedTimeInNs = elapsedTimeInNs;
+ this.action = ACTION.end;
+ PerfTrace.getInstance().tracePerfRecord(this);
+ perf.info(toString());
+ }
+ }
+ public String toString() {
+ return String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s"
+ , getInstId(), taskGroupId, taskId, phase, action,
+ DateFormatUtils.format(startTime, datetimeFormat), elapsedTimeInNs, count, size,getHostIP());
+ }
+ @Override
+ public int compareTo(PerfRecord o) {
+ if (o == null) {
+ return 1;
+ }
+ return this.elapsedTimeInNs > o.elapsedTimeInNs ? 1 : this.elapsedTimeInNs == o.elapsedTimeInNs ? 0 : -1;
+ }
+ @Override
+ public int hashCode() {
+ long jobId = getInstId();
+ int result = (int) (jobId ^ (jobId >>> 32));
+ result = 31 * result + taskGroupId;
+ result = 31 * result + taskId;
+ result = 31 * result + phase.toInt();
+ result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
+ return result;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if(!(o instanceof PerfRecord)){
+ return false;
+ }
+ PerfRecord dst = (PerfRecord)o;
+ if (this.getInstId() != dst.getInstId()) return false;
+ if (this.taskGroupId != dst.taskGroupId) return false;
+ if (this.taskId != dst.taskId) return false;
+ if (phase != null ? !phase.equals(dst.phase) : dst.phase != null) return false;
+ if (startTime != null ? !startTime.equals(dst.startTime) : dst.startTime != null) return false;
+ return true;
+ }
+ public PerfRecord copy() {
+ PerfRecord copy = new PerfRecord(this.taskGroupId, this.getTaskId(), this.phase);
+ copy.action = this.action;
+ copy.startTime = this.startTime;
+ copy.elapsedTimeInNs = this.elapsedTimeInNs;
+ copy.count = this.count;
+ copy.size = this.size;
+ return copy;
+ }
+ public int getTaskGroupId() {
+ return taskGroupId;
+ }
+ public int getTaskId() {
+ return taskId;
+ }
+ public PHASE getPhase() {
+ return phase;
+ }
+ public ACTION getAction() {
+ return action;
+ }
+ public long getElapsedTimeInNs() {
+ return elapsedTimeInNs;
+ }
+ public long getCount() {
+ return count;
+ }
+ public long getSize() {
+ return size;
+ }
+ public long getInstId(){
+ return PerfTrace.getInstance().getInstId();
+ }
+ public String getHostIP(){
+ return HostUtils.IP;
+ }
+ public String getHostName(){
+ return HostUtils.HOSTNAME;
+ }
+ public Date getStartTime() {
+ return startTime;
+ }
+ public long getStartTimeInMs() {
+ return startTime.getTime();
+ }
+ public long getStartTimeInNs() {
+ return startTimeInNs;
+ }
+ public String getDatetime(){
+ if(startTime == null){
+ return "null time";
+ }
+ return DateFormatUtils.format(startTime, datetimeFormat);
+ }
+ public boolean isReport() {
+ return isReport;
+ }
+ public void setIsReport(boolean isReport) {
+ this.isReport = isReport;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java b/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java
new file mode 100644
index 0000000000..ea9aa42110
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java
@@ -0,0 +1,907 @@
+package com.alibaba.datax.common.statistics;
+import com.alibaba.datax.common.statistics.PerfRecord.PHASE;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.util.HostUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+ * PerfTrace 记录 job(local模式),taskGroup(distribute模式),因为这2种都是jvm,即一个jvm里只需要有1个PerfTrace。
+ */
+public class PerfTrace {
+ private static Logger LOG = LoggerFactory.getLogger(PerfTrace.class);
+ private static PerfTrace instance;
+ private static final Object lock = new Object();
+ private String perfTraceId;
+ private volatile boolean enable;
+ private volatile boolean isJob;
+ private long instId;
+ private long jobId;
+ private long jobVersion;
+ private int taskGroupId;
+ private int channelNumber;
+ private int priority;
+ private int batchSize = 500;
+ private volatile boolean perfReportEnable = true;
+ //jobid_jobversion,instanceid,taskid, src_mark, dst_mark,
+ private Map taskDetails = new ConcurrentHashMap();
+ //PHASE => PerfRecord
+ private ConcurrentHashMap perfRecordMaps4print = new ConcurrentHashMap();
+ // job_phase => SumPerf4Report
+ private SumPerf4Report sumPerf4Report = new SumPerf4Report();
+ private SumPerf4Report sumPerf4Report4NotEnd;
+ private Configuration jobInfo;
+ private final Set needReportPool4NotEnd = new HashSet();
+ private final List totalEndReport = new ArrayList();
+ /**
+ * 单实例
+ *
+ * @param isJob
+ * @param jobId
+ * @param taskGroupId
+ * @return
+ */
+ public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
+ if (instance == null) {
+ synchronized (lock) {
+ if (instance == null) {
+ instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
+ }
+ }
+ }
+ return instance;
+ }
+ /**
+ * 因为一个JVM只有一个,因此在getInstance(isJob,jobId,taskGroupId)调用完成实例化后,方便后续调用,直接返回该实例
+ *
+ * @return
+ */
+ public static PerfTrace getInstance() {
+ if (instance == null) {
+ LOG.error("PerfTrace instance not be init! must have some error! ");
+ synchronized (lock) {
+ if (instance == null) {
+ instance = new PerfTrace(false, -1111, -1111, 0, false);
+ }
+ }
+ }
+ return instance;
+ }
+ private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
+ try {
+ this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
+ this.enable = enable;
+ this.isJob = isJob;
+ this.taskGroupId = taskGroupId;
+ this.instId = jobId;
+ this.priority = priority;
+ LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
+ } catch (Exception e) {
+ // do nothing
+ this.enable = false;
+ }
+ }
+ public void addTaskDetails(int taskId, String detail) {
+ if (enable) {
+ String before = "";
+ int index = detail.indexOf("?");
+ String current = detail.substring(0, index == -1 ? detail.length() : index);
+ if (current.indexOf("[") >= 0) {
+ current += "]";
+ }
+ if (taskDetails.containsKey(taskId)) {
+ before = taskDetails.get(taskId).trim();
+ }
+ if (StringUtils.isEmpty(before)) {
+ before = "";
+ } else {
+ before += ",";
+ }
+ this.taskDetails.put(taskId, before + current);
+ }
+ }
+ public void tracePerfRecord(PerfRecord perfRecord) {
+ try {
+ if (enable) {
+ long curNanoTime = System.nanoTime();
+ //ArrayList非线程安全
+ switch (perfRecord.getAction()) {
+ case end:
+ synchronized (totalEndReport) {
+ totalEndReport.add(perfRecord);
+ if (totalEndReport.size() > batchSize * 10) {
+ sumPerf4EndPrint(totalEndReport);
+ }
+ }
+ if (perfReportEnable && needReport(perfRecord)) {
+ synchronized (needReportPool4NotEnd) {
+ sumPerf4Report.add(curNanoTime,perfRecord);
+ needReportPool4NotEnd.remove(perfRecord);
+ }
+ }
+ break;
+ case start:
+ if (perfReportEnable && needReport(perfRecord)) {
+ synchronized (needReportPool4NotEnd) {
+ needReportPool4NotEnd.add(perfRecord);
+ }
+ }
+ break;
+ }
+ }
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+ private boolean needReport(PerfRecord perfRecord) {
+ switch (perfRecord.getPhase()) {
+ case TASK_TOTAL:
+ case SQL_QUERY:
+ return true;
+ }
+ return false;
+ }
+ public String summarizeNoException() {
+ String res;
+ try {
+ res = summarize();
+ } catch (Exception e) {
+ res = "PerfTrace summarize has Exception " + e.getMessage();
+ }
+ return res;
+ }
+ //任务结束时,对当前的perf总汇总统计
+ private synchronized String summarize() {
+ if (!enable) {
+ return "PerfTrace not enable!";
+ }
+ if (totalEndReport.size() > 0) {
+ sumPerf4EndPrint(totalEndReport);
+ }
+ StringBuilder info = new StringBuilder();
+ info.append("\n === total summarize info === \n");
+ info.append("\n 1. all phase average time info and max time task info: \n\n");
+ info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %-100s\n", "PHASE", "AVERAGE USED TIME", "ALL TASK NUM", "MAX USED TIME", "MAX TASK ID", "MAX TASK INFO"));
+ List keys = new ArrayList(perfRecordMaps4print.keySet());
+ Collections.sort(keys, new Comparator() {
+ @Override
+ public int compare(PHASE o1, PHASE o2) {
+ return o1.toInt() - o2.toInt();
+ }
+ });
+ for (PHASE phase : keys) {
+ SumPerfRecord4Print sumPerfRecord = perfRecordMaps4print.get(phase);
+ if (sumPerfRecord == null) {
+ continue;
+ }
+ long averageTime = sumPerfRecord.getAverageTime();
+ long maxTime = sumPerfRecord.getMaxTime();
+ int maxTaskId = sumPerfRecord.maxTaskId;
+ int maxTaskGroupId = sumPerfRecord.getMaxTaskGroupId();
+ info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %-100s\n",
+ phase, unitTime(averageTime), sumPerfRecord.totalCount, unitTime(maxTime), jobId + "-" + maxTaskGroupId + "-" + maxTaskId, taskDetails.get(maxTaskId)));
+ }
+ //SumPerfRecord4Print countSumPerf = Optional.fromNullable(perfRecordMaps4print.get(PHASE.READ_TASK_DATA)).or(new SumPerfRecord4Print());
+ SumPerfRecord4Print countSumPerf = perfRecordMaps4print.get(PHASE.READ_TASK_DATA);
+ if(countSumPerf == null){
+ countSumPerf = new SumPerfRecord4Print();
+ }
+ long averageRecords = countSumPerf.getAverageRecords();
+ long averageBytes = countSumPerf.getAverageBytes();
+ long maxRecord = countSumPerf.getMaxRecord();
+ long maxByte = countSumPerf.getMaxByte();
+ int maxTaskId4Records = countSumPerf.getMaxTaskId4Records();
+ int maxTGID4Records = countSumPerf.getMaxTGID4Records();
+ info.append("\n\n 2. record average count and max count task info :\n\n");
+ info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %18s | %-100s\n", "PHASE", "AVERAGE RECORDS", "AVERAGE BYTES", "MAX RECORDS", "MAX RECORD`S BYTES", "MAX TASK ID", "MAX TASK INFO"));
+ if (maxTaskId4Records > -1) {
+ info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %18s | %-100s\n"
+ , PHASE.READ_TASK_DATA, averageRecords, unitSize(averageBytes), maxRecord, unitSize(maxByte), jobId + "-" + maxTGID4Records + "-" + maxTaskId4Records, taskDetails.get(maxTaskId4Records)));
+ }
+ return info.toString();
+ }
+ //缺省传入的时间是nano
+ public static String unitTime(long time) {
+ return unitTime(time, TimeUnit.NANOSECONDS);
+ }
+ public static String unitTime(long time, TimeUnit timeUnit) {
+ return String.format("%,.3fs", ((float) timeUnit.toNanos(time)) / 1000000000);
+ }
+ public static String unitSize(long size) {
+ if (size > 1000000000) {
+ return String.format("%,.2fG", (float) size / 1000000000);
+ } else if (size > 1000000) {
+ return String.format("%,.2fM", (float) size / 1000000);
+ } else if (size > 1000) {
+ return String.format("%,.2fK", (float) size / 1000);
+ } else {
+ return size + "B";
+ }
+ }
+ public synchronized ConcurrentHashMap getPerfRecordMaps4print() {
+ if (totalEndReport.size() > 0) {
+ sumPerf4EndPrint(totalEndReport);
+ }
+ return perfRecordMaps4print;
+ }
+ public SumPerf4Report getSumPerf4Report() {
+ return sumPerf4Report;
+ }
+ public Set getNeedReportPool4NotEnd() {
+ return needReportPool4NotEnd;
+ }
+ public List getTotalEndReport() {
+ return totalEndReport;
+ }
+ public Map getTaskDetails() {
+ return taskDetails;
+ }
+ public boolean isEnable() {
+ return enable;
+ }
+ public boolean isJob() {
+ return isJob;
+ }
+ private String cluster;
+ private String jobDomain;
+ private String srcType;
+ private String dstType;
+ private String srcGuid;
+ private String dstGuid;
+ private Date windowStart;
+ private Date windowEnd;
+ private Date jobStartTime;
+ public void setJobInfo(Configuration jobInfo, boolean perfReportEnable, int channelNumber) {
+ try {
+ this.jobInfo = jobInfo;
+ if (jobInfo != null && perfReportEnable) {
+ cluster = jobInfo.getString("cluster");
+ String srcDomain = jobInfo.getString("srcDomain", "null");
+ String dstDomain = jobInfo.getString("dstDomain", "null");
+ jobDomain = srcDomain + "|" + dstDomain;
+ srcType = jobInfo.getString("srcType");
+ dstType = jobInfo.getString("dstType");
+ srcGuid = jobInfo.getString("srcGuid");
+ dstGuid = jobInfo.getString("dstGuid");
+ windowStart = getWindow(jobInfo.getString("windowStart"), true);
+ windowEnd = getWindow(jobInfo.getString("windowEnd"), false);
+ String jobIdStr = jobInfo.getString("jobId");
+ jobId = StringUtils.isEmpty(jobIdStr) ? (long) -5 : Long.parseLong(jobIdStr);
+ String jobVersionStr = jobInfo.getString("jobVersion");
+ jobVersion = StringUtils.isEmpty(jobVersionStr) ? (long) -4 : Long.parseLong(jobVersionStr);
+ jobStartTime = new Date();
+ }
+ this.perfReportEnable = perfReportEnable;
+ this.channelNumber = channelNumber;
+ } catch (Exception e) {
+ this.perfReportEnable = false;
+ }
+ }
+ private Date getWindow(String windowStr, boolean startWindow) {
+ SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
+ if (StringUtils.isNotEmpty(windowStr)) {
+ try {
+ return sdf1.parse(windowStr);
+ } catch (ParseException e) {
+ // do nothing
+ }
+ }
+ if (startWindow) {
+ try {
+ return sdf2.parse(sdf2.format(new Date()));
+ } catch (ParseException e1) {
+ //do nothing
+ }
+ }
+ return null;
+ }
+ public long getInstId() {
+ return instId;
+ }
+ public Configuration getJobInfo() {
+ return jobInfo;
+ }
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+ public synchronized JobStatisticsDto2 getReports(String mode) {
+ try {
+ if (!enable || !perfReportEnable) {
+ return null;
+ }
+ if (("job".equalsIgnoreCase(mode) && !isJob) || "tg".equalsIgnoreCase(mode) && isJob) {
+ return null;
+ }
+ //每次将未完成的task的统计清空
+ sumPerf4Report4NotEnd = new SumPerf4Report();
+ Set needReportPool4NotEndTmp = null;
+ synchronized (needReportPool4NotEnd) {
+ needReportPool4NotEndTmp = new HashSet(needReportPool4NotEnd);
+ }
+ long curNanoTime = System.nanoTime();
+ for (PerfRecord perfRecord : needReportPool4NotEndTmp) {
+ sumPerf4Report4NotEnd.add(curNanoTime, perfRecord);
+ }
+ JobStatisticsDto2 jdo = new JobStatisticsDto2();
+ jdo.setInstId(this.instId);
+ if (isJob) {
+ jdo.setTaskGroupId(-6);
+ } else {
+ jdo.setTaskGroupId(this.taskGroupId);
+ }
+ jdo.setJobId(this.jobId);
+ jdo.setJobVersion(this.jobVersion);
+ jdo.setWindowStart(this.windowStart);
+ jdo.setWindowEnd(this.windowEnd);
+ jdo.setJobStartTime(jobStartTime);
+ jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
+ jdo.setJobPriority(this.priority);
+ jdo.setChannelNum(this.channelNumber);
+ jdo.setCluster(this.cluster);
+ jdo.setJobDomain(this.jobDomain);
+ jdo.setSrcType(this.srcType);
+ jdo.setDstType(this.dstType);
+ jdo.setSrcGuid(this.srcGuid);
+ jdo.setDstGuid(this.dstGuid);
+ jdo.setHostAddress(HostUtils.IP);
+ //sum
+ jdo.setTaskTotalTimeMs(sumPerf4Report4NotEnd.totalTaskRunTimeInMs + sumPerf4Report.totalTaskRunTimeInMs);
+ jdo.setOdpsBlockCloseTimeMs(sumPerf4Report4NotEnd.odpsCloseTimeInMs + sumPerf4Report.odpsCloseTimeInMs);
+ jdo.setSqlQueryTimeMs(sumPerf4Report4NotEnd.sqlQueryTimeInMs + sumPerf4Report.sqlQueryTimeInMs);
+ jdo.setResultNextTimeMs(sumPerf4Report4NotEnd.resultNextTimeInMs + sumPerf4Report.resultNextTimeInMs);
+ return jdo;
+ } catch (Exception e) {
+ // do nothing
+ }
+ return null;
+ }
+ private void sumPerf4EndPrint(List totalEndReport) {
+ if (!enable || totalEndReport == null) {
+ return;
+ }
+ for (PerfRecord perfRecord : totalEndReport) {
+ perfRecordMaps4print.putIfAbsent(perfRecord.getPhase(), new SumPerfRecord4Print());
+ perfRecordMaps4print.get(perfRecord.getPhase()).add(perfRecord);
+ }
+ totalEndReport.clear();
+ }
+ public void setChannelNumber(int needChannelNumber) {
+ this.channelNumber = needChannelNumber;
+ }
+ public static class SumPerf4Report {
+ long totalTaskRunTimeInMs = 0L;
+ long odpsCloseTimeInMs = 0L;
+ long sqlQueryTimeInMs = 0L;
+ long resultNextTimeInMs = 0L;
+ public void add(long curNanoTime,PerfRecord perfRecord) {
+ try {
+ long runTimeEndInMs;
+ if (perfRecord.getElapsedTimeInNs() == -1) {
+ runTimeEndInMs = (curNanoTime - perfRecord.getStartTimeInNs()) / 1000000;
+ } else {
+ runTimeEndInMs = perfRecord.getElapsedTimeInNs() / 1000000;
+ }
+ switch (perfRecord.getPhase()) {
+ case TASK_TOTAL:
+ totalTaskRunTimeInMs += runTimeEndInMs;
+ break;
+ case SQL_QUERY:
+ sqlQueryTimeInMs += runTimeEndInMs;
+ break;
+ resultNextTimeInMs += runTimeEndInMs;
+ break;
+ odpsCloseTimeInMs += runTimeEndInMs;
+ break;
+ }
+ }catch (Exception e){
+ //do nothing
+ }
+ }
+ public long getTotalTaskRunTimeInMs() {
+ return totalTaskRunTimeInMs;
+ }
+ public long getOdpsCloseTimeInMs() {
+ return odpsCloseTimeInMs;
+ }
+ public long getSqlQueryTimeInMs() {
+ return sqlQueryTimeInMs;
+ }
+ public long getResultNextTimeInMs() {
+ return resultNextTimeInMs;
+ }
+ }
+ public static class SumPerfRecord4Print {
+ private long perfTimeTotal = 0;
+ private long averageTime = 0;
+ private long maxTime = 0;
+ private int maxTaskId = -1;
+ private int maxTaskGroupId = -1;
+ private int totalCount = 0;
+ private long recordsTotal = 0;
+ private long sizesTotal = 0;
+ private long averageRecords = 0;
+ private long averageBytes = 0;
+ private long maxRecord = 0;
+ private long maxByte = 0;
+ private int maxTaskId4Records = -1;
+ private int maxTGID4Records = -1;
+ public void add(PerfRecord perfRecord) {
+ if (perfRecord == null) {
+ return;
+ }
+ perfTimeTotal += perfRecord.getElapsedTimeInNs();
+ if (perfRecord.getElapsedTimeInNs() >= maxTime) {
+ maxTime = perfRecord.getElapsedTimeInNs();
+ maxTaskId = perfRecord.getTaskId();
+ maxTaskGroupId = perfRecord.getTaskGroupId();
+ }
+ recordsTotal += perfRecord.getCount();
+ sizesTotal += perfRecord.getSize();
+ if (perfRecord.getCount() >= maxRecord) {
+ maxRecord = perfRecord.getCount();
+ maxByte = perfRecord.getSize();
+ maxTaskId4Records = perfRecord.getTaskId();
+ maxTGID4Records = perfRecord.getTaskGroupId();
+ }
+ totalCount++;
+ }
+ public long getPerfTimeTotal() {
+ return perfTimeTotal;
+ }
+ public long getAverageTime() {
+ if (totalCount > 0) {
+ averageTime = perfTimeTotal / totalCount;
+ }
+ return averageTime;
+ }
+ public long getMaxTime() {
+ return maxTime;
+ }
+ public int getMaxTaskId() {
+ return maxTaskId;
+ }
+ public int getMaxTaskGroupId() {
+ return maxTaskGroupId;
+ }
+ public long getRecordsTotal() {
+ return recordsTotal;
+ }
+ public long getSizesTotal() {
+ return sizesTotal;
+ }
+ public long getAverageRecords() {
+ if (totalCount > 0) {
+ averageRecords = recordsTotal / totalCount;
+ }
+ return averageRecords;
+ }
+ public long getAverageBytes() {
+ if (totalCount > 0) {
+ averageBytes = sizesTotal / totalCount;
+ }
+ return averageBytes;
+ }
+ public long getMaxRecord() {
+ return maxRecord;
+ }
+ public long getMaxByte() {
+ return maxByte;
+ }
+ public int getMaxTaskId4Records() {
+ return maxTaskId4Records;
+ }
+ public int getMaxTGID4Records() {
+ return maxTGID4Records;
+ }
+ public int getTotalCount() {
+ return totalCount;
+ }
+ }
+ class JobStatisticsDto2 {
+ private Long id;
+ private Date gmtCreate;
+ private Date gmtModified;
+ private Long instId;
+ private Long jobId;
+ private Long jobVersion;
+ private Integer taskGroupId;
+ private Date windowStart;
+ private Date windowEnd;
+ private Date jobStartTime;
+ private Date jobEndTime;
+ private Long jobRunTimeMs;
+ private Integer jobPriority;
+ private Integer channelNum;
+ private String cluster;
+ private String jobDomain;
+ private String srcType;
+ private String dstType;
+ private String srcGuid;
+ private String dstGuid;
+ private Long records;
+ private Long bytes;
+ private Long speedRecord;
+ private Long speedByte;
+ private String stagePercent;
+ private Long errorRecord;
+ private Long errorBytes;
+ private Long waitReadTimeMs;
+ private Long waitWriteTimeMs;
+ private Long odpsBlockCloseTimeMs;
+ private Long sqlQueryTimeMs;
+ private Long resultNextTimeMs;
+ private Long taskTotalTimeMs;
+ private String hostAddress;
+ public Long getId() {
+ return id;
+ }
+ public Date getGmtCreate() {
+ return gmtCreate;
+ }
+ public Date getGmtModified() {
+ return gmtModified;
+ }
+ public Long getInstId() {
+ return instId;
+ }
+ public Long getJobId() {
+ return jobId;
+ }
+ public Long getJobVersion() {
+ return jobVersion;
+ }
+ public Integer getTaskGroupId() {
+ return taskGroupId;
+ }
+ public Date getWindowStart() {
+ return windowStart;
+ }
+ public Date getWindowEnd() {
+ return windowEnd;
+ }
+ public Date getJobStartTime() {
+ return jobStartTime;
+ }
+ public Date getJobEndTime() {
+ return jobEndTime;
+ }
+ public Long getJobRunTimeMs() {
+ return jobRunTimeMs;
+ }
+ public Integer getJobPriority() {
+ return jobPriority;
+ }
+ public Integer getChannelNum() {
+ return channelNum;
+ }
+ public String getCluster() {
+ return cluster;
+ }
+ public String getJobDomain() {
+ return jobDomain;
+ }
+ public String getSrcType() {
+ return srcType;
+ }
+ public String getDstType() {
+ return dstType;
+ }
+ public String getSrcGuid() {
+ return srcGuid;
+ }
+ public String getDstGuid() {
+ return dstGuid;
+ }
+ public Long getRecords() {
+ return records;
+ }
+ public Long getBytes() {
+ return bytes;
+ }
+ public Long getSpeedRecord() {
+ return speedRecord;
+ }
+ public Long getSpeedByte() {
+ return speedByte;
+ }
+ public String getStagePercent() {
+ return stagePercent;
+ }
+ public Long getErrorRecord() {
+ return errorRecord;
+ }
+ public Long getErrorBytes() {
+ return errorBytes;
+ }
+ public Long getWaitReadTimeMs() {
+ return waitReadTimeMs;
+ }
+ public Long getWaitWriteTimeMs() {
+ return waitWriteTimeMs;
+ }
+ public Long getOdpsBlockCloseTimeMs() {
+ return odpsBlockCloseTimeMs;
+ }
+ public Long getSqlQueryTimeMs() {
+ return sqlQueryTimeMs;
+ }
+ public Long getResultNextTimeMs() {
+ return resultNextTimeMs;
+ }
+ public Long getTaskTotalTimeMs() {
+ return taskTotalTimeMs;
+ }
+ public String getHostAddress() {
+ return hostAddress;
+ }
+ public void setId(Long id) {
+ this.id = id;
+ }
+ public void setGmtCreate(Date gmtCreate) {
+ this.gmtCreate = gmtCreate;
+ }
+ public void setGmtModified(Date gmtModified) {
+ this.gmtModified = gmtModified;
+ }
+ public void setInstId(Long instId) {
+ this.instId = instId;
+ }
+ public void setJobId(Long jobId) {
+ this.jobId = jobId;
+ }
+ public void setJobVersion(Long jobVersion) {
+ this.jobVersion = jobVersion;
+ }
+ public void setTaskGroupId(Integer taskGroupId) {
+ this.taskGroupId = taskGroupId;
+ }
+ public void setWindowStart(Date windowStart) {
+ this.windowStart = windowStart;
+ }
+ public void setWindowEnd(Date windowEnd) {
+ this.windowEnd = windowEnd;
+ }
+ public void setJobStartTime(Date jobStartTime) {
+ this.jobStartTime = jobStartTime;
+ }
+ public void setJobEndTime(Date jobEndTime) {
+ this.jobEndTime = jobEndTime;
+ }
+ public void setJobRunTimeMs(Long jobRunTimeMs) {
+ this.jobRunTimeMs = jobRunTimeMs;
+ }
+ public void setJobPriority(Integer jobPriority) {
+ this.jobPriority = jobPriority;
+ }
+ public void setChannelNum(Integer channelNum) {
+ this.channelNum = channelNum;
+ }
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+ public void setJobDomain(String jobDomain) {
+ this.jobDomain = jobDomain;
+ }
+ public void setSrcType(String srcType) {
+ this.srcType = srcType;
+ }
+ public void setDstType(String dstType) {
+ this.dstType = dstType;
+ }
+ public void setSrcGuid(String srcGuid) {
+ this.srcGuid = srcGuid;
+ }
+ public void setDstGuid(String dstGuid) {
+ this.dstGuid = dstGuid;
+ }
+ public void setRecords(Long records) {
+ this.records = records;
+ }
+ public void setBytes(Long bytes) {
+ this.bytes = bytes;
+ }
+ public void setSpeedRecord(Long speedRecord) {
+ this.speedRecord = speedRecord;
+ }
+ public void setSpeedByte(Long speedByte) {
+ this.speedByte = speedByte;
+ }
+ public void setStagePercent(String stagePercent) {
+ this.stagePercent = stagePercent;
+ }
+ public void setErrorRecord(Long errorRecord) {
+ this.errorRecord = errorRecord;
+ }
+ public void setErrorBytes(Long errorBytes) {
+ this.errorBytes = errorBytes;
+ }
+ public void setWaitReadTimeMs(Long waitReadTimeMs) {
+ this.waitReadTimeMs = waitReadTimeMs;
+ }
+ public void setWaitWriteTimeMs(Long waitWriteTimeMs) {
+ this.waitWriteTimeMs = waitWriteTimeMs;
+ }
+ public void setOdpsBlockCloseTimeMs(Long odpsBlockCloseTimeMs) {
+ this.odpsBlockCloseTimeMs = odpsBlockCloseTimeMs;
+ }
+ public void setSqlQueryTimeMs(Long sqlQueryTimeMs) {
+ this.sqlQueryTimeMs = sqlQueryTimeMs;
+ }
+ public void setResultNextTimeMs(Long resultNextTimeMs) {
+ this.resultNextTimeMs = resultNextTimeMs;
+ }
+ public void setTaskTotalTimeMs(Long taskTotalTimeMs) {
+ this.taskTotalTimeMs = taskTotalTimeMs;
+ }
+ public void setHostAddress(String hostAddress) {
+ this.hostAddress = hostAddress;
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java b/common-scala/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java
new file mode 100644
index 0000000000..cab42a4b94
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/statistics/VMInfo.java
@@ -0,0 +1,412 @@
+package com.alibaba.datax.common.statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+ * Created by liqiang on 15/11/12.
+ */
+public class VMInfo {
+ private static final Logger LOG = LoggerFactory.getLogger(VMInfo.class);
+ static final long MB = 1024 * 1024;
+ static final long GB = 1024 * 1024 * 1024;
+ public static Object lock = new Object();
+ private static VMInfo vmInfo;
+ /**
+ * @return null or vmInfo. null is something error, job no care it.
+ */
+ public static VMInfo getVmInfo() {
+ if (vmInfo == null) {
+ synchronized (lock) {
+ if (vmInfo == null) {
+ try {
+ vmInfo = new VMInfo();
+ } catch (Exception e) {
+ LOG.warn("no need care, the fail is ignored : vmInfo init failed " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ return vmInfo;
+ }
+ // 数据的MxBean
+ private final OperatingSystemMXBean osMXBean;
+ private final RuntimeMXBean runtimeMXBean;
+ private final List garbageCollectorMXBeanList;
+ private final List memoryPoolMXBeanList;
+ /**
+ * 静态信息
+ */
+ private final String osInfo;
+ private final String jvmInfo;
+ /**
+ * cpu个数
+ */
+ private final int totalProcessorCount;
+ /**
+ * 机器的各个状态,用于中间打印和统计上报
+ */
+ private final PhyOSStatus startPhyOSStatus;
+ private final ProcessCpuStatus processCpuStatus = new ProcessCpuStatus();
+ private final ProcessGCStatus processGCStatus = new ProcessGCStatus();
+ private final ProcessMemoryStatus processMomoryStatus = new ProcessMemoryStatus();
+ //ms
+ private long lastUpTime = 0;
+ //nano
+ private long lastProcessCpuTime = 0;
+ private VMInfo() {
+ //初始化静态信息
+ osMXBean = java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ runtimeMXBean = java.lang.management.ManagementFactory.getRuntimeMXBean();
+ garbageCollectorMXBeanList = java.lang.management.ManagementFactory.getGarbageCollectorMXBeans();
+ memoryPoolMXBeanList = java.lang.management.ManagementFactory.getMemoryPoolMXBeans();
+ osInfo = runtimeMXBean.getVmVendor() + " " + runtimeMXBean.getSpecVersion() + " " + runtimeMXBean.getVmVersion();
+ jvmInfo = osMXBean.getName() + " " + osMXBean.getArch() + " " + osMXBean.getVersion();
+ totalProcessorCount = osMXBean.getAvailableProcessors();
+ //构建startPhyOSStatus
+ startPhyOSStatus = new PhyOSStatus();
+ LOG.info("VMInfo# operatingSystem class => " + osMXBean.getClass().getName());
+ if (VMInfo.isSunOsMBean(osMXBean)) {
+ {
+ startPhyOSStatus.totalPhysicalMemory = VMInfo.getLongFromOperatingSystem(osMXBean, "getTotalPhysicalMemorySize");
+ startPhyOSStatus.freePhysicalMemory = VMInfo.getLongFromOperatingSystem(osMXBean, "getFreePhysicalMemorySize");
+ startPhyOSStatus.maxFileDescriptorCount = VMInfo.getLongFromOperatingSystem(osMXBean, "getMaxFileDescriptorCount");
+ startPhyOSStatus.currentOpenFileDescriptorCount = VMInfo.getLongFromOperatingSystem(osMXBean, "getOpenFileDescriptorCount");
+ }
+ }
+ //初始化processGCStatus;
+ for (GarbageCollectorMXBean garbage : garbageCollectorMXBeanList) {
+ GCStatus gcStatus = new GCStatus();
+ gcStatus.name = garbage.getName();
+ processGCStatus.gcStatusMap.put(garbage.getName(), gcStatus);
+ }
+ //初始化processMemoryStatus
+ if (memoryPoolMXBeanList != null && !memoryPoolMXBeanList.isEmpty()) {
+ for (MemoryPoolMXBean pool : memoryPoolMXBeanList) {
+ MemoryStatus memoryStatus = new MemoryStatus();
+ memoryStatus.name = pool.getName();
+ memoryStatus.initSize = pool.getUsage().getInit();
+ memoryStatus.maxSize = pool.getUsage().getMax();
+ processMomoryStatus.memoryStatusMap.put(pool.getName(), memoryStatus);
+ }
+ }
+ }
+ public String toString() {
+ return "the machine info => \n\n"
+ + "\tosInfo:\t" + osInfo + "\n"
+ + "\tjvmInfo:\t" + jvmInfo + "\n"
+ + "\tcpu num:\t" + totalProcessorCount + "\n\n"
+ + startPhyOSStatus.toString() + "\n"
+ + processGCStatus.toString() + "\n"
+ + processMomoryStatus.toString() + "\n";
+ }
+ public String totalString() {
+ return (processCpuStatus.getTotalString() + processGCStatus.getTotalString());
+ }
+ public void getDelta() {
+ getDelta(true);
+ }
+ public synchronized void getDelta(boolean print) {
+ try {
+ if (VMInfo.isSunOsMBean(osMXBean)) {
+ long curUptime = runtimeMXBean.getUptime();
+ long curProcessTime = getLongFromOperatingSystem(osMXBean, "getProcessCpuTime");
+ //百分比, uptime是ms,processTime是nano
+ if ((curUptime > lastUpTime) && (curProcessTime >= lastProcessCpuTime)) {
+ float curDeltaCpu = (float) (curProcessTime - lastProcessCpuTime) / ((curUptime - lastUpTime) * totalProcessorCount * 10000);
+ processCpuStatus.setMaxMinCpu(curDeltaCpu);
+ processCpuStatus.averageCpu = (float) curProcessTime / (curUptime * totalProcessorCount * 10000);
+ lastUpTime = curUptime;
+ lastProcessCpuTime = curProcessTime;
+ }
+ }
+ for (GarbageCollectorMXBean garbage : garbageCollectorMXBeanList) {
+ GCStatus gcStatus = processGCStatus.gcStatusMap.get(garbage.getName());
+ if (gcStatus == null) {
+ gcStatus = new GCStatus();
+ gcStatus.name = garbage.getName();
+ processGCStatus.gcStatusMap.put(garbage.getName(), gcStatus);
+ }
+ long curTotalGcCount = garbage.getCollectionCount();
+ gcStatus.setCurTotalGcCount(curTotalGcCount);
+ long curtotalGcTime = garbage.getCollectionTime();
+ gcStatus.setCurTotalGcTime(curtotalGcTime);
+ }
+ if (memoryPoolMXBeanList != null && !memoryPoolMXBeanList.isEmpty()) {
+ for (MemoryPoolMXBean pool : memoryPoolMXBeanList) {
+ MemoryStatus memoryStatus = processMomoryStatus.memoryStatusMap.get(pool.getName());
+ if (memoryStatus == null) {
+ memoryStatus = new MemoryStatus();
+ memoryStatus.name = pool.getName();
+ processMomoryStatus.memoryStatusMap.put(pool.getName(), memoryStatus);
+ }
+ memoryStatus.commitedSize = pool.getUsage().getCommitted();
+ memoryStatus.setMaxMinUsedSize(pool.getUsage().getUsed());
+ long maxMemory = memoryStatus.commitedSize > 0 ? memoryStatus.commitedSize : memoryStatus.maxSize;
+ memoryStatus.setMaxMinPercent(maxMemory > 0 ? (float) 100 * memoryStatus.usedSize / maxMemory : -1);
+ }
+ }
+ if (print) {
+ LOG.info(processCpuStatus.getDeltaString() + processMomoryStatus.getDeltaString() + processGCStatus.getDeltaString());
+ }
+ } catch (Exception e) {
+ LOG.warn("no need care, the fail is ignored : vmInfo getDelta failed " + e.getMessage(), e);
+ }
+ }
+ public static boolean isSunOsMBean(OperatingSystemMXBean operatingSystem) {
+ final String className = operatingSystem.getClass().getName();
+ return "com.sun.management.UnixOperatingSystem".equals(className);
+ }
+ public static long getLongFromOperatingSystem(OperatingSystemMXBean operatingSystem, String methodName) {
+ try {
+ final Method method = operatingSystem.getClass().getMethod(methodName, (Class>[]) null);
+ method.setAccessible(true);
+ return (Long) method.invoke(operatingSystem, (Object[]) null);
+ } catch (final Exception e) {
+ LOG.info(String.format("OperatingSystemMXBean %s failed, Exception = %s ", methodName, e.getMessage()));
+ }
+ return -1;
+ }
+ private class PhyOSStatus {
+ long totalPhysicalMemory = -1;
+ long freePhysicalMemory = -1;
+ long maxFileDescriptorCount = -1;
+ long currentOpenFileDescriptorCount = -1;
+ public String toString() {
+ return String.format("\ttotalPhysicalMemory:\t%,.2fG\n"
+ + "\tfreePhysicalMemory:\t%,.2fG\n"
+ + "\tmaxFileDescriptorCount:\t%s\n"
+ + "\tcurrentOpenFileDescriptorCount:\t%s\n",
+ (float) totalPhysicalMemory / GB, (float) freePhysicalMemory / GB, maxFileDescriptorCount, currentOpenFileDescriptorCount);
+ }
+ }
+ private class ProcessCpuStatus {
+ // 百分比的值 比如30.0 表示30.0%
+ float maxDeltaCpu = -1;
+ float minDeltaCpu = -1;
+ float curDeltaCpu = -1;
+ float averageCpu = -1;
+ public void setMaxMinCpu(float curCpu) {
+ this.curDeltaCpu = curCpu;
+ if (maxDeltaCpu < curCpu) {
+ maxDeltaCpu = curCpu;
+ }
+ if (minDeltaCpu == -1 || minDeltaCpu > curCpu) {
+ minDeltaCpu = curCpu;
+ }
+ }
+ public String getDeltaString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t [delta cpu info] => \n");
+ sb.append("\t\t");
+ sb.append(String.format("%-30s | %-30s | %-30s | %-30s \n", "curDeltaCpu", "averageCpu", "maxDeltaCpu", "minDeltaCpu"));
+ sb.append("\t\t");
+ sb.append(String.format("%-30s | %-30s | %-30s | %-30s \n",
+ String.format("%,.2f%%", processCpuStatus.curDeltaCpu),
+ String.format("%,.2f%%", processCpuStatus.averageCpu),
+ String.format("%,.2f%%", processCpuStatus.maxDeltaCpu),
+ String.format("%,.2f%%\n", processCpuStatus.minDeltaCpu)));
+ return sb.toString();
+ }
+ public String getTotalString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t [total cpu info] => \n");
+ sb.append("\t\t");
+ sb.append(String.format("%-30s | %-30s | %-30s \n", "averageCpu", "maxDeltaCpu", "minDeltaCpu"));
+ sb.append("\t\t");
+ sb.append(String.format("%-30s | %-30s | %-30s \n",
+ String.format("%,.2f%%", processCpuStatus.averageCpu),
+ String.format("%,.2f%%", processCpuStatus.maxDeltaCpu),
+ String.format("%,.2f%%\n", processCpuStatus.minDeltaCpu)));
+ return sb.toString();
+ }
+ }
+ private class ProcessGCStatus {
+ final Map gcStatusMap = new HashMap();
+ public String toString() {
+ return "\tGC Names\t" + gcStatusMap.keySet() + "\n";
+ }
+ public String getDeltaString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t [delta gc info] => \n");
+ sb.append("\t\t ");
+ sb.append(String.format("%-20s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s \n", "NAME", "curDeltaGCCount", "totalGCCount", "maxDeltaGCCount", "minDeltaGCCount", "curDeltaGCTime", "totalGCTime", "maxDeltaGCTime", "minDeltaGCTime"));
+ for (GCStatus gc : gcStatusMap.values()) {
+ sb.append("\t\t ");
+ sb.append(String.format("%-20s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s \n",
+ gc.name, gc.curDeltaGCCount, gc.totalGCCount, gc.maxDeltaGCCount, gc.minDeltaGCCount,
+ String.format("%,.3fs",(float)gc.curDeltaGCTime/1000),
+ String.format("%,.3fs",(float)gc.totalGCTime/1000),
+ String.format("%,.3fs",(float)gc.maxDeltaGCTime/1000),
+ String.format("%,.3fs",(float)gc.minDeltaGCTime/1000)));
+ }
+ return sb.toString();
+ }
+ public String getTotalString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t [total gc info] => \n");
+ sb.append("\t\t ");
+ sb.append(String.format("%-20s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s \n", "NAME", "totalGCCount", "maxDeltaGCCount", "minDeltaGCCount", "totalGCTime", "maxDeltaGCTime", "minDeltaGCTime"));
+ for (GCStatus gc : gcStatusMap.values()) {
+ sb.append("\t\t ");
+ sb.append(String.format("%-20s | %-18s | %-18s | %-18s | %-18s | %-18s | %-18s \n",
+ gc.name, gc.totalGCCount, gc.maxDeltaGCCount, gc.minDeltaGCCount,
+ String.format("%,.3fs",(float)gc.totalGCTime/1000),
+ String.format("%,.3fs",(float)gc.maxDeltaGCTime/1000),
+ String.format("%,.3fs",(float)gc.minDeltaGCTime/1000)));
+ }
+ return sb.toString();
+ }
+ }
+ private class ProcessMemoryStatus {
+ final Map memoryStatusMap = new HashMap();
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\t");
+ sb.append(String.format("%-30s | %-30s | %-30s \n", "MEMORY_NAME", "allocation_size", "init_size"));
+ for (MemoryStatus ms : memoryStatusMap.values()) {
+ sb.append("\t");
+ sb.append(String.format("%-30s | %-30s | %-30s \n",
+ ms.name, String.format("%,.2fMB", (float) ms.maxSize / MB), String.format("%,.2fMB", (float) ms.initSize / MB)));
+ }
+ return sb.toString();
+ }
+ public String getDeltaString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t [delta memory info] => \n");
+ sb.append("\t\t ");
+ sb.append(String.format("%-30s | %-30s | %-30s | %-30s | %-30s \n", "NAME", "used_size", "used_percent", "max_used_size", "max_percent"));
+ for (MemoryStatus ms : memoryStatusMap.values()) {
+ sb.append("\t\t ");
+ sb.append(String.format("%-30s | %-30s | %-30s | %-30s | %-30s \n",
+ ms.name, String.format("%,.2f", (float) ms.usedSize / MB) + "MB",
+ String.format("%,.2f", (float) ms.percent) + "%",
+ String.format("%,.2f", (float) ms.maxUsedSize / MB) + "MB",
+ String.format("%,.2f", (float) ms.maxpercent) + "%"));
+ }
+ return sb.toString();
+ }
+ }
+ private class GCStatus {
+ String name;
+ long maxDeltaGCCount = -1;
+ long minDeltaGCCount = -1;
+ long curDeltaGCCount;
+ long totalGCCount = 0;
+ long maxDeltaGCTime = -1;
+ long minDeltaGCTime = -1;
+ long curDeltaGCTime;
+ long totalGCTime = 0;
+ public void setCurTotalGcCount(long curTotalGcCount) {
+ this.curDeltaGCCount = curTotalGcCount - totalGCCount;
+ this.totalGCCount = curTotalGcCount;
+ if (maxDeltaGCCount < curDeltaGCCount) {
+ maxDeltaGCCount = curDeltaGCCount;
+ }
+ if (minDeltaGCCount == -1 || minDeltaGCCount > curDeltaGCCount) {
+ minDeltaGCCount = curDeltaGCCount;
+ }
+ }
+ public void setCurTotalGcTime(long curTotalGcTime) {
+ this.curDeltaGCTime = curTotalGcTime - totalGCTime;
+ this.totalGCTime = curTotalGcTime;
+ if (maxDeltaGCTime < curDeltaGCTime) {
+ maxDeltaGCTime = curDeltaGCTime;
+ }
+ if (minDeltaGCTime == -1 || minDeltaGCTime > curDeltaGCTime) {
+ minDeltaGCTime = curDeltaGCTime;
+ }
+ }
+ }
+ private class MemoryStatus {
+ String name;
+ long initSize;
+ long maxSize;
+ long commitedSize;
+ long usedSize;
+ float percent;
+ long maxUsedSize = -1;
+ float maxpercent = 0;
+ void setMaxMinUsedSize(long curUsedSize) {
+ if (maxUsedSize < curUsedSize) {
+ maxUsedSize = curUsedSize;
+ }
+ this.usedSize = curUsedSize;
+ }
+ void setMaxMinPercent(float curPercent) {
+ if (maxpercent < curPercent) {
+ maxpercent = curPercent;
+ }
+ this.percent = curPercent;
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/DESCipher.java b/common-scala/src/main/java/com/alibaba/datax/common/util/DESCipher.java
new file mode 100755
index 0000000000..0692a7b3e3
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/DESCipher.java
@@ -0,0 +1,229 @@
+ * (C) 2010-2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.datax.common.util;
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.DESKeySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.security.SecureRandom;
+ * DES加解密,支持与delphi交互(字符串编码需统一为UTF-8)
+ * 将这个工具类抽取到 common 中,方便后续代码复用
+ */
+public class DESCipher {
+ private static Logger LOGGER = LoggerFactory.getLogger(DESCipher.class);
+ /**
+ * 密钥
+ */
+ public static final String KEY = "";
+ private final static String DES = "DES";
+ /**
+ * 加密
+ * @param src 明文(字节)
+ * @param key 密钥,长度必须是8的倍数
+ * @return 密文(字节)
+ * @throws Exception
+ */
+ public static byte[] encrypt(byte[] src, byte[] key) throws Exception {
+ // DES算法要求有一个可信任的随机数源
+ SecureRandom sr = new SecureRandom();
+ // 从原始密匙数据创建DESKeySpec对象
+ DESKeySpec dks = new DESKeySpec(key);
+ // 创建一个密匙工厂,然后用它把DESKeySpec转换成
+ // 一个SecretKey对象
+ SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
+ SecretKey securekey = keyFactory.generateSecret(dks);
+ // Cipher对象实际完成加密操作
+ Cipher cipher = Cipher.getInstance(DES);
+ // 用密匙初始化Cipher对象
+ cipher.init(Cipher.ENCRYPT_MODE, securekey, sr);
+ // 现在,获取数据并加密
+ // 正式执行加密操作
+ return cipher.doFinal(src);
+ }
+ /**
+ * * 解密
+ * * @param src
+ * * 密文(字节)
+ * * @param key
+ * * 密钥,长度必须是8的倍数
+ * * @return 明文(字节)
+ * * @throws Exception
+ */
+ public static byte[] decrypt(byte[] src, byte[] key) throws Exception {
+ // DES算法要求有一个可信任的随机数源
+ SecureRandom sr = new SecureRandom();
+ // 从原始密匙数据创建一个DESKeySpec对象
+ DESKeySpec dks = new DESKeySpec(key);
+ // 创建一个密匙工厂,然后用它把DESKeySpec对象转换成
+ // 一个SecretKey对象
+ SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
+ SecretKey securekey = keyFactory.generateSecret(dks);
+ // Cipher对象实际完成解密操作
+ Cipher cipher = Cipher.getInstance(DES);
+ // 用密匙初始化Cipher对象
+ cipher.init(Cipher.DECRYPT_MODE, securekey, sr);
+ // 现在,获取数据并解密
+ // 正式执行解密操作
+ return cipher.doFinal(src);
+ }
+ /**
+ * 加密
+ * @param src * 明文(字节)
+ * @return 密文(字节)
+ * @throws Exception
+ */
+ public static byte[] encrypt(byte[] src) throws Exception {
+ return encrypt(src, KEY.getBytes());
+ }
+ /**
+ * 解密
+ * @param src 密文(字节)
+ * @return 明文(字节)
+ * @throws Exception
+ */
+ public static byte[] decrypt(byte[] src) throws Exception {
+ return decrypt(src, KEY.getBytes());
+ }
+ /**
+ * 加密
+ * @param src 明文(字符串)
+ * @return 密文(16进制字符串)
+ * @throws Exception
+ */
+ public final static String encrypt(String src) {
+ try {
+ return byte2hex(encrypt(src.getBytes(), KEY.getBytes()));
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ return null;
+ }
+ /**
+ * 加密
+ * @param src 明文(字符串)
+ * @param encryptKey 加密用的秘钥
+ * @return 密文(16进制字符串)
+ * @throws Exception
+ */
+ public final static String encrypt(String src, String encryptKey) {
+ try {
+ return byte2hex(encrypt(src.getBytes(), encryptKey.getBytes()));
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ return null;
+ }
+ /**
+ * 解密
+ * @param src 密文(字符串)
+ * @return 明文(字符串)
+ * @throws Exception
+ */
+ public final static String decrypt(String src) {
+ try {
+ return new String(decrypt(hex2byte(src.getBytes()), KEY.getBytes()));
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ return null;
+ }
+ /**
+ * 解密
+ * @param src 密文(字符串)
+ * @param decryptKey 解密用的秘钥
+ * @return 明文(字符串)
+ * @throws Exception
+ */
+ public final static String decrypt(String src, String decryptKey) {
+ try {
+ return new String(decrypt(hex2byte(src.getBytes()), decryptKey.getBytes()));
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ return null;
+ }
+ /**
+ * 加密
+ * @param src
+ * 明文(字节)
+ * @return 密文(16进制字符串)
+ * @throws Exception
+ */
+ public static String encryptToString(byte[] src) throws Exception {
+ return encrypt(new String(src));
+ }
+ /**
+ * 解密
+ * @param src 密文(字节)
+ * @return 明文(字符串)
+ * @throws Exception
+ */
+ public static String decryptToString(byte[] src) throws Exception {
+ return decrypt(new String(src));
+ }
+ public static String byte2hex(byte[] b) {
+ String hs = "";
+ String stmp = "";
+ for (int n = 0; n < b.length; n++) {
+ stmp = (Integer.toHexString(b[n] & 0XFF));
+ if (stmp.length() == 1)
+ hs = hs + "0" + stmp;
+ else
+ hs = hs + stmp;
+ }
+ return hs.toUpperCase();
+ }
+ public static byte[] hex2byte(byte[] b) {
+ if ((b.length % 2) != 0)
+ throw new IllegalArgumentException("The length is not an even number");
+ byte[] b2 = new byte[b.length / 2];
+ for (int n = 0; n < b.length; n += 2) {
+ String item = new String(b, n, 2);
+ b2[n / 2] = (byte) Integer.parseInt(item, 16);
+ }
+ return b2;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/FilterUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/util/FilterUtil.java
new file mode 100755
index 0000000000..37b319a194
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/FilterUtil.java
@@ -0,0 +1,52 @@
+package com.alibaba.datax.common.util;
+import java.util.*;
+import java.util.regex.Pattern;
+ * 提供从 List 中根据 regular 过滤的通用工具(返回值已经去重). 使用场景,比如:odpsreader
+ * 的分区筛选,hdfsreader/txtfilereader的路径筛选等
+ */
+public final class FilterUtil {
+ //已经去重
+ public static List filterByRegular(List allStrs,
+ String regular) {
+ List matchedValues = new ArrayList();
+ // 语法习惯上的兼容处理(pt=* 实际正则应该是:pt=.*)
+ String newReqular = regular.replace(".*", "*").replace("*", ".*");
+ Pattern p = Pattern.compile(newReqular);
+ for (String partition : allStrs) {
+ if (p.matcher(partition).matches()) {
+ if (!matchedValues.contains(partition)) {
+ matchedValues.add(partition);
+ }
+ }
+ }
+ return matchedValues;
+ }
+ //已经去重
+ public static List filterByRegulars(List allStrs,
+ List regulars) {
+ List matchedValues = new ArrayList();
+ List tempMatched = null;
+ for (String regular : regulars) {
+ tempMatched = filterByRegular(allStrs, regular);
+ if (null != tempMatched && !tempMatched.isEmpty()) {
+ for (String temp : tempMatched) {
+ if (!matchedValues.contains(temp)) {
+ matchedValues.add(temp);
+ }
+ }
+ }
+ }
+ return matchedValues;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/HostUtils.java b/common-scala/src/main/java/com/alibaba/datax/common/util/HostUtils.java
new file mode 100644
index 0000000000..2ed8f1019c
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/HostUtils.java
@@ -0,0 +1,49 @@
+package com.alibaba.datax.common.util;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+ * Created by liqiang on 15/8/25.
+ */
+public class HostUtils {
+ public static final String IP;
+ public static final String HOSTNAME;
+ private static final Logger log = LoggerFactory.getLogger(HostUtils.class);
+ static {
+ String ip;
+ String hostname;
+ try {
+ InetAddress addr = InetAddress.getLocalHost();
+ ip = addr.getHostAddress();
+ hostname = addr.getHostName();
+ } catch (UnknownHostException e) {
+ log.error("Can't find out address: " + e.getMessage());
+ ip = "UNKNOWN";
+ hostname = "UNKNOWN";
+ }
+ if (ip.equals("") || ip.equals("::1") || ip.equals("UNKNOWN")) {
+ try {
+ Process process = Runtime.getRuntime().exec("hostname -i");
+ if (process.waitFor() == 0) {
+ ip = new String(IOUtils.toByteArray(process.getInputStream()), "UTF8");
+ }
+ process = Runtime.getRuntime().exec("hostname");
+ if (process.waitFor() == 0) {
+ hostname = (new String(IOUtils.toByteArray(process.getInputStream()), "UTF8")).trim();
+ }
+ } catch (Exception e) {
+ log.warn("get hostname failed {}", e.getMessage());
+ }
+ }
+ IP = ip;
+ HOSTNAME = hostname;
+ log.info("IP {} HOSTNAME {}", IP, HOSTNAME);
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java
new file mode 100644
index 0000000000..8bab301e6f
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java
@@ -0,0 +1,62 @@
+package com.alibaba.datax.common.util;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.alibaba.datax.common.exception.DataXException;
+public class IdAndKeyRollingUtil {
+ private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class);
+ public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
+ public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
+ public final static String ACCESS_ID = "accessId";
+ public final static String ACCESS_KEY = "accessKey";
+ public static String parseAkFromSkynetAccessKey() {
+ Map envProp = System.getenv();
+ String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
+ String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY);
+ String accessKey = null;
+ // follow 原有的判断条件
+ // 环境变量中,如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy(只要有其中一个变量,则认为一定是两个都存在的!
+ // if (StringUtils.isNotBlank(skynetAccessID) ||
+ // StringUtils.isNotBlank(skynetAccessKey)) {
+ // 检查严格,只有加密串不为空的时候才进去,不过 之前能跑的加密串都不应该为空
+ if (StringUtils.isNotBlank(skynetAccessKey)) {
+ LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY.");
+ accessKey = DESCipher.decrypt(skynetAccessKey);
+ if (StringUtils.isBlank(accessKey)) {
+ // 环境变量里面有,但是解析不到
+ throw DataXException.asDataXException(String.format(
+ "Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]",
+ skynetAccessID));
+ }
+ }
+ if (StringUtils.isNotBlank(accessKey)) {
+ LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully.");
+ }
+ return accessKey;
+ }
+ public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) {
+ String accessId = null;
+ Map envProp = System.getenv();
+ accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
+ String accessKey = null;
+ if (StringUtils.isBlank(accessKey)) {
+ // 老的没有出异常,只是获取不到ak
+ accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey();
+ }
+ if (StringUtils.isNotBlank(accessKey)) {
+ // 确认使用这个的都是 accessId、accessKey的命名习惯
+ originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId);
+ originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey);
+ }
+ return accessKey;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/ListUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/util/ListUtil.java
new file mode 100755
index 0000000000..c4482ecdf3
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/ListUtil.java
@@ -0,0 +1,161 @@
+package com.alibaba.datax.common.util;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+ * 提供针对 DataX中使用的 List 较为常见的一些封装。 比如:checkIfValueDuplicate 可以用于检查用户配置的 writer
+ * 的列不能重复。makeSureNoValueDuplicate亦然,只是会严格报错。
+ */
+public final class ListUtil {
+ public static boolean checkIfValueDuplicate(List aList,
+ boolean caseSensitive) {
+ if (null == aList || aList.isEmpty()) {
+ throw DataXException.asDataXException(CommonErrorCode.CONFIG_ERROR,
+ "您提供的作业配置有误,List不能为空.");
+ }
+ try {
+ makeSureNoValueDuplicate(aList, caseSensitive);
+ } catch (Exception e) {
+ return true;
+ }
+ return false;
+ }
+ public static void makeSureNoValueDuplicate(List aList,
+ boolean caseSensitive) {
+ if (null == aList || aList.isEmpty()) {
+ throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+ }
+ if (1 == aList.size()) {
+ return;
+ } else {
+ List list = null;
+ if (!caseSensitive) {
+ list = valueToLowerCase(aList);
+ } else {
+ list = new ArrayList(aList);
+ }
+ Collections.sort(list);
+ for (int i = 0, len = list.size() - 1; i < len; i++) {
+ if (list.get(i).equals(list.get(i + 1))) {
+ throw DataXException
+ .asDataXException(
+ CommonErrorCode.CONFIG_ERROR,
+ String.format(
+ "您提供的作业配置信息有误, String:[%s] 不允许重复出现在列表中: [%s].",
+ list.get(i),
+ StringUtils.join(aList, ",")));
+ }
+ }
+ }
+ }
+ public static boolean checkIfBInA(List aList, List bList,
+ boolean caseSensitive) {
+ if (null == aList || aList.isEmpty() || null == bList
+ || bList.isEmpty()) {
+ throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+ }
+ try {
+ makeSureBInA(aList, bList, caseSensitive);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+ public static void makeSureBInA(List aList, List bList,
+ boolean caseSensitive) {
+ if (null == aList || aList.isEmpty() || null == bList
+ || bList.isEmpty()) {
+ throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+ }
+ List all = null;
+ List part = null;
+ if (!caseSensitive) {
+ all = valueToLowerCase(aList);
+ part = valueToLowerCase(bList);
+ } else {
+ all = new ArrayList(aList);
+ part = new ArrayList(bList);
+ }
+ for (String oneValue : part) {
+ if (!all.contains(oneValue)) {
+ throw DataXException
+ .asDataXException(
+ CommonErrorCode.CONFIG_ERROR,
+ String.format(
+ "您提供的作业配置信息有误, String:[%s] 不存在于列表中:[%s].",
+ oneValue, StringUtils.join(aList, ",")));
+ }
+ }
+ }
+ public static boolean checkIfValueSame(List aList) {
+ if (null == aList || aList.isEmpty()) {
+ throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+ }
+ if (1 == aList.size()) {
+ return true;
+ } else {
+ Boolean firstValue = aList.get(0);
+ for (int i = 1, len = aList.size(); i < len; i++) {
+ if (firstValue.booleanValue() != aList.get(i).booleanValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ public static List valueToLowerCase(List aList) {
+ if (null == aList || aList.isEmpty()) {
+ throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+ }
+ List result = new ArrayList(aList.size());
+ for (String oneValue : aList) {
+ result.add(null != oneValue ? oneValue.toLowerCase() : null);
+ }
+ return result;
+ }
+ public static Boolean checkIfHasSameValue(List listA, List listB) {
+ if (null == listA || listA.isEmpty() || null == listB || listB.isEmpty()) {
+ return false;
+ }
+ for (String oneValue : listA) {
+ if (listB.contains(oneValue)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ public static boolean checkIfAllSameValue(List listA, List listB) {
+ if (null == listA || listA.isEmpty() || null == listB || listB.isEmpty() || listA.size() != listB.size()) {
+ return false;
+ }
+ return new HashSet<>(listA).containsAll(new HashSet<>(listB));
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/RangeSplitUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/util/RangeSplitUtil.java
new file mode 100755
index 0000000000..791f9ea12c
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/RangeSplitUtil.java
@@ -0,0 +1,209 @@
+package com.alibaba.datax.common.util;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import java.math.BigInteger;
+import java.util.*;
+ * 提供通用的根据数字范围、字符串范围等进行切分的通用功能.
+ */
+public final class RangeSplitUtil {
+ public static String[] doAsciiStringSplit(String left, String right, int expectSliceNumber) {
+ int radix = 128;
+ BigInteger[] tempResult = doBigIntegerSplit(stringToBigInteger(left, radix),
+ stringToBigInteger(right, radix), expectSliceNumber);
+ String[] result = new String[tempResult.length];
+ //处理第一个字符串(因为:在转换为数字,再还原的时候,如果首字符刚好是 basic,则不知道应该添加多少个 basic)
+ result[0] = left;
+ result[tempResult.length - 1] = right;
+ for (int i = 1, len = tempResult.length - 1; i < len; i++) {
+ result[i] = bigIntegerToString(tempResult[i], radix);
+ }
+ return result;
+ }
+ public static long[] doLongSplit(long left, long right, int expectSliceNumber) {
+ BigInteger[] result = doBigIntegerSplit(BigInteger.valueOf(left),
+ BigInteger.valueOf(right), expectSliceNumber);
+ long[] returnResult = new long[result.length];
+ for (int i = 0, len = result.length; i < len; i++) {
+ returnResult[i] = result[i].longValue();
+ }
+ return returnResult;
+ }
+ public static BigInteger[] doBigIntegerSplit(BigInteger left, BigInteger right, int expectSliceNumber) {
+ if (expectSliceNumber < 1) {
+ throw new IllegalArgumentException(String.format(
+ "切分份数不能小于1. 此处:expectSliceNumber=[%s].", expectSliceNumber));
+ }
+ if (null == left || null == right) {
+ throw new IllegalArgumentException(String.format(
+ "对 BigInteger 进行切分时,其左右区间不能为 null. 此处:left=[%s],right=[%s].", left, right));
+ }
+ if (left.compareTo(right) == 0) {
+ return new BigInteger[]{left, right};
+ } else {
+ // 调整大小顺序,确保 left < right
+ if (left.compareTo(right) > 0) {
+ BigInteger temp = left;
+ left = right;
+ right = temp;
+ }
+ //left < right
+ BigInteger endAndStartGap = right.subtract(left);
+ BigInteger step = endAndStartGap.divide(BigInteger.valueOf(expectSliceNumber));
+ BigInteger remainder = endAndStartGap.remainder(BigInteger.valueOf(expectSliceNumber));
+ //remainder 不可能超过expectSliceNumber,所以不需要检查remainder的 Integer 的范围
+ // 这里不能 step.intValue()==0,因为可能溢出
+ if (step.compareTo(BigInteger.ZERO) == 0) {
+ expectSliceNumber = remainder.intValue();
+ }
+ BigInteger[] result = new BigInteger[expectSliceNumber + 1];
+ result[0] = left;
+ result[expectSliceNumber] = right;
+ BigInteger lowerBound;
+ BigInteger upperBound = left;
+ for (int i = 1; i < expectSliceNumber; i++) {
+ lowerBound = upperBound;
+ upperBound = lowerBound.add(step);
+ upperBound = upperBound.add((remainder.compareTo(BigInteger.valueOf(i)) >= 0)
+ ? BigInteger.ONE : BigInteger.ZERO);
+ result[i] = upperBound;
+ }
+ return result;
+ }
+ }
+ private static void checkIfBetweenRange(int value, int left, int right) {
+ if (value < left || value > right) {
+ throw new IllegalArgumentException(String.format("parameter can not <[%s] or >[%s].",
+ left, right));
+ }
+ }
+ /**
+ * 由于只支持 ascii 码对应字符,所以radix 范围为[1,128]
+ */
+ public static BigInteger stringToBigInteger(String aString, int radix) {
+ if (null == aString) {
+ throw new IllegalArgumentException("参数 bigInteger 不能为空.");
+ }
+ checkIfBetweenRange(radix, 1, 128);
+ BigInteger result = BigInteger.ZERO;
+ BigInteger radixBigInteger = BigInteger.valueOf(radix);
+ int tempChar;
+ int k = 0;
+ for (int i = aString.length() - 1; i >= 0; i--) {
+ tempChar = aString.charAt(i);
+ if (tempChar >= 128) {
+ throw new IllegalArgumentException(String.format("根据字符串进行切分时仅支持 ASCII 字符串,而字符串:[%s]非 ASCII 字符串.", aString));
+ }
+ result = result.add(BigInteger.valueOf(tempChar).multiply(radixBigInteger.pow(k)));
+ k++;
+ }
+ return result;
+ }
+ /**
+ * 把BigInteger 转换为 String.注意:radix 和 basic 范围都为[1,128], radix + basic 的范围也必须在[1,128].
+ */
+ private static String bigIntegerToString(BigInteger bigInteger, int radix) {
+ if (null == bigInteger) {
+ throw new IllegalArgumentException("参数 bigInteger 不能为空.");
+ }
+ checkIfBetweenRange(radix, 1, 128);
+ StringBuilder resultStringBuilder = new StringBuilder();
+ List list = new ArrayList();
+ BigInteger radixBigInteger = BigInteger.valueOf(radix);
+ BigInteger currentValue = bigInteger;
+ BigInteger quotient = currentValue.divide(radixBigInteger);
+ while (quotient.compareTo(BigInteger.ZERO) > 0) {
+ list.add(currentValue.remainder(radixBigInteger).intValue());
+ currentValue = currentValue.divide(radixBigInteger);
+ quotient = currentValue;
+ }
+ Collections.reverse(list);
+ if (list.isEmpty()) {
+ list.add(0, bigInteger.remainder(radixBigInteger).intValue());
+ }
+ Map map = new HashMap();
+ for (int i = 0; i < radix; i++) {
+ map.put(i, (char) (i));
+ }
+// String msg = String.format("%s 转为 %s 进制,结果为:%s", bigInteger.longValue(), radix, list);
+// System.out.println(msg);
+ for (Integer aList : list) {
+ resultStringBuilder.append(map.get(aList));
+ }
+ return resultStringBuilder.toString();
+ }
+ /**
+ * 获取字符串中的最小字符和最大字符(依据 ascii 进行判断).要求字符串必须非空,并且为 ascii 字符串.
+ * 返回的Pair,left=最小字符,right=最大字符.
+ */
+ public static Pair getMinAndMaxCharacter(String aString) {
+ if (!isPureAscii(aString)) {
+ throw new IllegalArgumentException(String.format("根据字符串进行切分时仅支持 ASCII 字符串,而字符串:[%s]非 ASCII 字符串.", aString));
+ }
+ char min = aString.charAt(0);
+ char max = min;
+ char temp;
+ for (int i = 1, len = aString.length(); i < len; i++) {
+ temp = aString.charAt(i);
+ min = min < temp ? min : temp;
+ max = max > temp ? max : temp;
+ }
+ return new ImmutablePair(min, max);
+ }
+ private static boolean isPureAscii(String aString) {
+ if (null == aString) {
+ return false;
+ }
+ for (int i = 0, len = aString.length(); i < len; i++) {
+ char ch = aString.charAt(i);
+ if (ch >= 127 || ch < 0) {
+ return false;
+ }
+ }
+ return true;
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/common/util/RetryUtil.java b/common-scala/src/main/java/com/alibaba/datax/common/util/RetryUtil.java
new file mode 100755
index 0000000000..33c712874b
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/common/util/RetryUtil.java
@@ -0,0 +1,208 @@
+package com.alibaba.datax.common.util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.*;
+public final class RetryUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryUtil.class);
+ private static final long MAX_SLEEP_MILLISECOND = 256 * 1000;
+ /**
+ * 重试次数工具方法.
+ *
+ * @param callable 实际逻辑
+ * @param retryTimes 最大重试次数(>1)
+ * @param sleepTimeInMilliSecond 运行失败后休眠对应时间再重试
+ * @param exponential 休眠时间是否指数递增
+ * @param 返回值类型
+ * @return 经过重试的callable的执行结果
+ */
+ public static T executeWithRetry(Callable callable,
+ int retryTimes,
+ long sleepTimeInMilliSecond,
+ boolean exponential) throws Exception {
+ Retry retry = new Retry();
+ return retry.doRetry(callable, retryTimes, sleepTimeInMilliSecond, exponential, null);
+ }
+ /**
+ * 重试次数工具方法.
+ *
+ * @param callable 实际逻辑
+ * @param retryTimes 最大重试次数(>1)
+ * @param sleepTimeInMilliSecond 运行失败后休眠对应时间再重试
+ * @param exponential 休眠时间是否指数递增
+ * @param 返回值类型
+ * @param retryExceptionClasss 出现指定的异常类型时才进行重试
+ * @return 经过重试的callable的执行结果
+ */
+ public static T executeWithRetry(Callable callable,
+ int retryTimes,
+ long sleepTimeInMilliSecond,
+ boolean exponential,
+ List> retryExceptionClasss) throws Exception {
+ Retry retry = new Retry();
+ return retry.doRetry(callable, retryTimes, sleepTimeInMilliSecond, exponential, retryExceptionClasss);
+ }
+ /**
+ * 在外部线程执行并且重试。每次执行需要在timeoutMs内执行完,不然视为失败。
+ * 执行异步操作的线程池从外部传入,线程池的共享粒度由外部控制。比如,HttpClientUtil共享一个线程池。
+ *
+ * 限制条件:仅仅能够在阻塞的时候interrupt线程
+ *
+ * @param callable 实际逻辑
+ * @param retryTimes 最大重试次数(>1)
+ * @param sleepTimeInMilliSecond 运行失败后休眠对应时间再重试
+ * @param exponential 休眠时间是否指数递增
+ * @param timeoutMs callable执行超时时间,毫秒
+ * @param executor 执行异步操作的线程池
+ * @param 返回值类型
+ * @return 经过重试的callable的执行结果
+ */
+ public static T asyncExecuteWithRetry(Callable callable,
+ int retryTimes,
+ long sleepTimeInMilliSecond,
+ boolean exponential,
+ long timeoutMs,
+ ThreadPoolExecutor executor) throws Exception {
+ Retry retry = new AsyncRetry(timeoutMs, executor);
+ return retry.doRetry(callable, retryTimes, sleepTimeInMilliSecond, exponential, null);
+ }
+ /**
+ * 创建异步执行的线程池。特性如下:
+ * core大小为0,初始状态下无线程,无初始消耗。
+ * max大小为5,最多五个线程。
+ * 60秒超时时间,闲置超过60秒线程会被回收。
+ * 使用SynchronousQueue,任务不会排队,必须要有可用线程才能提交成功,否则会RejectedExecutionException。
+ *
+ * @return 线程池
+ */
+ public static ThreadPoolExecutor createThreadPoolExecutor() {
+ return new ThreadPoolExecutor(0, 5,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue());
+ }
+ private static class Retry {
+ public T doRetry(Callable callable, int retryTimes, long sleepTimeInMilliSecond, boolean exponential, List> retryExceptionClasss)
+ throws Exception {
+ if (null == callable) {
+ throw new IllegalArgumentException("系统编程错误, 入参callable不能为空 ! ");
+ }
+ if (retryTimes < 1) {
+ throw new IllegalArgumentException(String.format(
+ "系统编程错误, 入参retrytime[%d]不能小于1 !", retryTimes));
+ }
+ Exception saveException = null;
+ for (int i = 0; i < retryTimes; i++) {
+ try {
+ return call(callable);
+ } catch (Exception e) {
+ saveException = e;
+ if (i == 0) {
+ LOG.error(String.format("Exception when calling callable, 异常Msg:%s", saveException.getMessage()), saveException);
+ }
+ if (null != retryExceptionClasss && !retryExceptionClasss.isEmpty()) {
+ boolean needRetry = false;
+ for (Class> eachExceptionClass : retryExceptionClasss) {
+ if (eachExceptionClass == e.getClass()) {
+ needRetry = true;
+ break;
+ }
+ }
+ if (!needRetry) {
+ throw saveException;
+ }
+ }
+ if (i + 1 < retryTimes && sleepTimeInMilliSecond > 0) {
+ long startTime = System.currentTimeMillis();
+ long timeToSleep;
+ if (exponential) {
+ timeToSleep = sleepTimeInMilliSecond * (long) Math.pow(2, i);
+ if(timeToSleep >= MAX_SLEEP_MILLISECOND) {
+ }
+ } else {
+ timeToSleep = sleepTimeInMilliSecond;
+ if(timeToSleep >= MAX_SLEEP_MILLISECOND) {
+ }
+ }
+ try {
+ Thread.sleep(timeToSleep);
+ } catch (InterruptedException ignored) {
+ }
+ long realTimeSleep = System.currentTimeMillis()-startTime;
+ LOG.error(String.format("Exception when calling callable, 即将尝试执行第%s次重试.本次重试计划等待[%s]ms,实际等待[%s]ms, 异常Msg:[%s]",
+ i+1, timeToSleep,realTimeSleep, e.getMessage()));
+ }
+ }
+ }
+ throw saveException;
+ }
+ protected T call(Callable callable) throws Exception {
+ return callable.call();
+ }
+ }
+ private static class AsyncRetry extends Retry {
+ private long timeoutMs;
+ private ThreadPoolExecutor executor;
+ public AsyncRetry(long timeoutMs, ThreadPoolExecutor executor) {
+ this.timeoutMs = timeoutMs;
+ this.executor = executor;
+ }
+ /**
+ * 使用传入的线程池异步执行任务,并且等待。
+ *
+ * future.get()方法,等待指定的毫秒数。如果任务在超时时间内结束,则正常返回。
+ * 如果抛异常(可能是执行超时、执行异常、被其他线程cancel或interrupt),都记录日志并且网上抛异常。
+ * 正常和非正常的情况都会判断任务是否结束,如果没有结束,则cancel任务。cancel参数为true,表示即使
+ * 任务正在执行,也会interrupt线程。
+ *
+ * @param callable
+ * @param
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected T call(Callable callable) throws Exception {
+ Future future = executor.submit(callable);
+ try {
+ return future.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.warn("Try once failed", e);
+ throw e;
+ } finally {
+ if (!future.isDone()) {
+ future.cancel(true);
+ LOG.warn("Try once task not done, cancel it, active count: " + executor.getActiveCount());
+ }
+ }
+ }
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/core/job/IJobContainerContext.java b/common-scala/src/main/java/com/alibaba/datax/core/job/IJobContainerContext.java
new file mode 100644
index 0000000000..4ad9dd9cd5
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/core/job/IJobContainerContext.java
@@ -0,0 +1,12 @@
+package com.alibaba.datax.core.job;
+import com.qlangtech.tis.datax.IDataXNameAware;
+import com.qlangtech.tis.datax.IDataXTaskRelevant;
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2023-02-23 10:06
+ **/
+public interface IJobContainerContext extends IDataXTaskRelevant, IDataXNameAware {
diff --git a/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java
new file mode 100755
index 0000000000..0e4692e2c8
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java
@@ -0,0 +1,22 @@
+package com.alibaba.datax.plugin.rdbms.writer;
+ * 用于插件解析用户配置时,需要进行标识(MARK)的常量的声明.
+ */
+public final class Constant {
+ public static final int DEFAULT_BATCH_SIZE = 2048;
+ public static final int DEFAULT_BATCH_BYTE_SIZE = 32 * 1024 * 1024;
+ public static String TABLE_NAME_PLACEHOLDER = "@table";
+ public static String CONN_MARK = "connection";
+ public static String TABLE_NUMBER_MARK = "tableNumber";
+ public static String INSERT_OR_REPLACE_TEMPLATE_MARK = "insertOrReplaceTemplate";
+ public static final String OB10_SPLIT_STRING = "||_dsc_ob10_dsc_||";
+ public static final String OB10_SPLIT_STRING_PATTERN = "\\|\\|_dsc_ob10_dsc_\\|\\|";
diff --git a/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java
new file mode 100755
index 0000000000..05f5ce1a2b
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Key.java
@@ -0,0 +1,42 @@
+package com.alibaba.datax.plugin.rdbms.writer;
+public final class Key {
+ public final static String JDBC_URL = "jdbcUrl";
+ public final static String USERNAME = "username";
+ public final static String PASSWORD = "password";
+ public final static String TABLE = "table";
+ public final static String COLUMN = "column";
+ public final static String ESCAPE_CHAR = "escapeChar";
+ //可选值为:insert,replace,默认为 insert (mysql 支持,oracle 没用 replace 机制,只能 insert,oracle 可以不暴露这个参数)
+ public final static String WRITE_MODE = "writeMode";
+ public final static String PRE_SQL = "preSql";
+ public final static String POST_SQL = "postSql";
+ public final static String TDDL_APP_NAME = "appName";
+ //默认值:256
+ public final static String BATCH_SIZE = "batchSize";
+ //默认值:32m
+ public final static String BATCH_BYTE_SIZE = "batchByteSize";
+ public final static String EMPTY_AS_NULL = "emptyAsNull";
+ public final static String DB_NAME_PATTERN = "dbNamePattern";
+ public final static String DB_RULE = "dbRule";
+ public final static String TABLE_NAME_PATTERN = "tableNamePattern";
+ public final static String TABLE_RULE = "tableRule";
+ public final static String DRYRUN = "dryRun";
diff --git a/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java
new file mode 100644
index 0000000000..7e7d171602
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/EscapeableEntity.java
@@ -0,0 +1,38 @@
+package com.alibaba.datax.plugin.rdbms.writer.util;
+import com.qlangtech.tis.plugin.ds.IDBReservedKeys;
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2022-10-03 10:38
+ **/
+public class EscapeableEntity {
+ protected final IDBReservedKeys escapeChar;
+ protected final boolean containEscapeChar;
+ public EscapeableEntity(IDBReservedKeys escapeChar) {
+ this.escapeChar = escapeChar;
+ this.containEscapeChar = escapeChar.getEscapeChar().isPresent();
+ }
+ public String getEscapeChar() {
+ return escapeChar.getEscapeChar().get();
+ }
+ public boolean isContainEscapeChar() {
+ return containEscapeChar;
+ }
+ protected String escapeEntity(String val) {
+ return escapeChar.getEscapedEntity(val);
+ }
+ protected String unescapeEntity(String val) {
+// if (containEscapeChar) {
+// return StringUtils.remove(val, this.escapeChar);
+// } else {
+// return val;
+// }
+ return escapeChar.removeEscapeChar(val);
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java
new file mode 100644
index 0000000000..99cb54a617
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectCols.java
@@ -0,0 +1,127 @@
+package com.alibaba.datax.plugin.rdbms.writer.util;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.common.util.ListUtil;
+import com.alibaba.datax.plugin.rdbms.writer.Key;
+import com.qlangtech.tis.plugin.ds.IDBReservedKeys;
+import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2022-09-30 14:23
+ **/
+public class SelectCols extends EscapeableEntity implements Iterable {
+ protected final List columns;
+ @Override
+ public Iterator iterator() {
+ return this.columns.iterator();
+ }
+// public static SelectCols createSelectCols(Configuration conf) {
+// return createSelectCols(conf, null);
+// }
+ public static SelectCols createSelectCols(Configuration conf, IDBReservedKeys escapeChar) {
+ return new SelectCols(
+ conf.getList(Key.COLUMN, String.class), escapeChar);
+ }
+ public static SelectCols createSelectCols(List allColumns) {
+ return new SelectCols(allColumns, null);
+ }
+ private SelectCols(List columns, IDBReservedKeys escapeChar) {
+ super(escapeChar);
+ this.columns = columns.stream().map((c) -> unescapeEntity(c)).collect(Collectors.toList());
+ if (this.columns == null || this.columns.isEmpty()) {
+ throw new IllegalArgumentException("param colums can not be empty ");
+ }
+ }
+ public boolean isSelectAllCols() {
+ return 1 == this.size() && "*".equals(this.columns.get(0));
+ }
+ public void makeSureNoValueDuplicate(
+ boolean caseSensitive) {
+// if (null == aList || aList.isEmpty()) {
+// throw new IllegalArgumentException("您提供的作业配置有误, List不能为空.");
+// }
+ if (1 == this.columns.size()) {
+ return;
+ } else {
+ List list = null;
+ if (!caseSensitive) {
+ list = ListUtil.valueToLowerCase(this.columns);
+ } else {
+ list = new ArrayList(this.columns);
+ }
+ Collections.sort(list);
+ for (int i = 0, len = list.size() - 1; i < len; i++) {
+ if (list.get(i).equals(list.get(i + 1))) {
+ throw DataXException
+ .asDataXException(
+ CommonErrorCode.CONFIG_ERROR,
+ String.format(
+ "您提供的作业配置信息有误, String:[%s] 不允许重复出现在列表中: [%s].",
+ list.get(i),
+ StringUtils.join(this.columns, ",")));
+ }
+ }
+ }
+ }
+ public String getCols() {
+ return columns.stream().map((c) -> {
+ return escapeEntity(c);
+ }).collect(Collectors.joining(","));
+ }
+ /**
+ * 只适用于MySQL
+ *
+ * @return
+ */
+ public String onDuplicateKeyUpdateString() {
+ if (columns == null || columns.size() < 1) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append(" ON DUPLICATE KEY UPDATE ");
+ boolean first = true;
+ for (String column : columns) {
+ if (!first) {
+ sb.append(",");
+ } else {
+ first = false;
+ }
+ sb.append(column);
+ sb.append("=VALUES(");
+ sb.append(column);
+ sb.append(")");
+ }
+ return sb.toString();
+ }
+ public int size() {
+ return columns.size();
+ }
+ public boolean containsCol(String name) {
+ return columns.contains(name);
+ }
diff --git a/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectTable.java b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectTable.java
new file mode 100644
index 0000000000..1124948c03
--- /dev/null
+++ b/common-scala/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/SelectTable.java
@@ -0,0 +1,57 @@
+package com.alibaba.datax.plugin.rdbms.writer.util;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.datax.plugin.rdbms.writer.Key;
+import com.qlangtech.tis.plugin.ds.IDBReservedKeys;
+import org.apache.commons.lang3.StringUtils;
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2022-10-03 10:34
+ **/
+public class SelectTable extends EscapeableEntity {
+ private final String tabName;
+ public static SelectTable create(Configuration conf, IDBReservedKeys escapeChar) {
+// return new SelectTable(
+// conf.getString(String.format(
+// "%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE)), conf.get(Key.ESCAPE_CHAR, String.class));
+ return create(conf.getString(String.format(
+ "%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE)), escapeChar);
+ }
+ public static SelectTable create(String table, IDBReservedKeys escapeChar) {
+ return new SelectTable(table, escapeChar);
+ }
+ public static SelectTable createInTask(Configuration conf, IDBReservedKeys escapeChar) {
+ return new SelectTable(
+ conf.getString(Key.TABLE), escapeChar);
+ }
+ public String getTabName() {
+ return this.escapeEntity(tabName);
+ }
+ public String getUnescapeTabName() {
+ return this.tabName;
+ }
+ @Override
+ public String toString() {
+ return getTabName();
+ }
+ private SelectTable(String tabName, IDBReservedKeys escapeChar) {
+ super(escapeChar);
+ if (StringUtils.isEmpty(tabName)) {
+ throw new IllegalArgumentException("param tabName can not be empty");
+ }
+ this.tabName = tabName;
+ }
diff --git a/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/StringColumn.scala b/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/StringColumn.scala
new file mode 100644
index 0000000000..c6201ba672
--- /dev/null
+++ b/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/StringColumn.scala
@@ -0,0 +1,24 @@
+package com.alibaba.datax.common.element.scala
+import java.math.{BigDecimal, BigInteger}
+import java.util.Date
+case class StringColumn(colVal: String) extends AnyVal with TypedCol {
+ override def asLong: Long = 1;
+ override def asDouble: Double = 1
+ override def asString: String = ""
+ override def asDate: Date = null
+ override def asDate(dateFormat: String): Date = null
+ override def asBytes: Array[Byte] = null
+ override def asBoolean: Boolean = false
+ override def asBigDecimal: BigDecimal = null
+ override def asBigInteger: BigInteger = null
diff --git a/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/TypedCol.scala b/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/TypedCol.scala
new file mode 100644
index 0000000000..c3e55f3f6b
--- /dev/null
+++ b/common-scala/src/main/scala/com/alibaba/datax/common/element/scala/TypedCol.scala
@@ -0,0 +1,26 @@
+package com.alibaba.datax.common.element.scala
+import java.math.{BigDecimal, BigInteger}
+import java.util.Date
+ *
+ */
+trait TypedCol extends Any{
+ def asLong: Long
+ def asDouble: Double
+ def asString: String
+ def asDate: Date
+ def asDate(dateFormat: String): Date
+ def asBytes: Array[Byte]
+ def asBoolean: Boolean
+ def asBigDecimal: BigDecimal
+ def asBigInteger: BigInteger
diff --git a/pom.xml b/pom.xml
index d6f00bf370..4ff9e0adec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,11 +43,17 @@
+ 2.11.12
+ 2.11
+ common-scala