Skip to content

Commit

Permalink
introduce IJobContainerContext into DataX for that enable fetch CoreC…
Browse files Browse the repository at this point in the history
…ontainer resource in Writer or Reader Job/Task
  • Loading branch information
baisui1981 committed Feb 23, 2023
1 parent 58b7bf2 commit bc6c967
Show file tree
Hide file tree
Showing 67 changed files with 3,926 additions and 1,198 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
package com.alibaba.datax.plugin.writer.adbpgwriter;

import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil;
import com.alibaba.datax.plugin.writer.adbpgwriter.copy.Adb4pgClientProxy;
import com.alibaba.datax.plugin.writer.adbpgwriter.util.Adb4pgUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode.*;
import static com.alibaba.datax.plugin.rdbms.util.DataBaseType.PostgreSQL;
import java.util.ArrayList;
import java.util.List;

/**
* @author yuncheng
Expand All @@ -36,21 +29,21 @@ public static class Job extends Writer.Job {
public void init() {
this.originalConfig = super.getPluginJobConf();
LOG.info("in Job.init(), config is:[\n{}\n]", originalConfig.toJSON());
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE, this.containerContext);
//convert to DatabaseConfig, use DatabaseConfig to check user configuration
Adb4pgUtil.checkConfig(originalConfig);
}

@Override
public void prepare() {

Adb4pgUtil.prepare(originalConfig);
Adb4pgUtil.prepare(originalConfig, this.containerContext);
}

@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> splitResult = new ArrayList<Configuration>();
for(int i = 0; i < adviceNumber; i++) {
for (int i = 0; i < adviceNumber; i++) {
splitResult.add(this.originalConfig.clone());
}
return splitResult;
Expand All @@ -59,7 +52,7 @@ public List<Configuration> split(int adviceNumber) {
@Override
public void post() {

Adb4pgUtil.post(originalConfig);
Adb4pgUtil.post(originalConfig, this.containerContext);
}

@Override
Expand All @@ -68,24 +61,24 @@ public void destroy() {
}



}

public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CommonRdbmsWriter.Task commonRdbmsWriterSlave;
private Adb4pgClientProxy adb4pgClientProxy;

//Adb4pgClient client;
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.adb4pgClientProxy = new Adb4pgClientProxy(writerSliceConfig, super.getTaskPluginCollector());
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE){
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE, this.containerContext) {
@Override
public String calcValueHolder(String columnType){
if("serial".equalsIgnoreCase(columnType)){
public String calcValueHolder(String columnType) {
if ("serial".equalsIgnoreCase(columnType)) {
return "?::int";
}else if("bit".equalsIgnoreCase(columnType)){
} else if ("bit".equalsIgnoreCase(columnType)) {
return "?::bit varying";
}
return "?::" + columnType;
Expand All @@ -100,7 +93,7 @@ public void prepare() {

@Override
public void startWrite(RecordReceiver recordReceiver) {
this.adb4pgClientProxy.startWriteWithConnection(recordReceiver, Adb4pgUtil.getAdbpgConnect(writerSliceConfig));
this.adb4pgClientProxy.startWriteWithConnection(recordReceiver, Adb4pgUtil.getAdbpgConnect(writerSliceConfig, this.containerContext));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.alibaba.cloud.analyticdb.adb4pgclient.Adb4pgClientException;
import com.alibaba.cloud.analyticdb.adb4pgclient.DatabaseConfig;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.job.IJobContainerContext;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.Key;
Expand Down Expand Up @@ -35,6 +36,7 @@ public static void checkConfig(Configuration originalConfig) {
throw new Adb4pgClientException(Adb4pgClientException.CONFIG_ERROR, "Check config exception: " + e.getMessage(), null);
}
}

public static DatabaseConfig convertConfiguration(Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.USERNAME, COLUMN_SPLIT_ERROR);
originalConfig.getNecessaryValue(Key.PASSWORD, COLUMN_SPLIT_ERROR);
Expand Down Expand Up @@ -80,11 +82,11 @@ private static Map<String, List<String>> splitBySchemaName(List<String> tables)
return res;
}

public static Connection getAdbpgConnect(Configuration conf) {
public static Connection getAdbpgConnect(Configuration conf, IJobContainerContext containerContext) {
String userName = conf.getString(Key.USERNAME);
String passWord = conf.getString(Key.PASSWORD);

com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter dsFactoryGetter = DBUtil.getReaderDataSourceFactoryGetter(conf);
com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter dsFactoryGetter = DBUtil.getReaderDataSourceFactoryGetter(conf, containerContext);

return DBUtil.getConnection(dsFactoryGetter, generateJdbcUrl(conf), userName, passWord);

Expand All @@ -99,7 +101,7 @@ private static String generateJdbcUrl(Configuration configuration) {

}

public static void prepare(Configuration originalConfig) {
public static void prepare(Configuration originalConfig, IJobContainerContext containerContext) {
List<String> preSqls = originalConfig.getList(Key.PRE_SQL,
String.class);

Expand All @@ -113,17 +115,17 @@ public static void prepare(Configuration originalConfig) {

originalConfig.remove(Key.PRE_SQL);

Connection conn = getAdbpgConnect(originalConfig);
Connection conn = getAdbpgConnect(originalConfig, containerContext);
WriterUtil.executeSqls(conn, renderedPreSqls, generateJdbcUrl(originalConfig), DATABASE_TYPE);
DBUtil.closeDBResources(null, null, conn);


}

public static void post(Configuration configuration) {
public static void post(Configuration configuration, IJobContainerContext containerContext) {
List<String> postSqls = configuration.getList(Key.POST_SQL,
String.class);
SelectTable tableName = SelectTable.createInTask( configuration);//.getString(Key.TABLE);
SelectTable tableName = SelectTable.createInTask(configuration);//.getString(Key.TABLE);
List<String> renderedPostSqls = WriterUtil.renderPreOrPostSqls(
postSqls, tableName);

Expand All @@ -133,7 +135,7 @@ public static void post(Configuration configuration) {

configuration.remove(Key.POST_SQL);

Connection conn = getAdbpgConnect(configuration);
Connection conn = getAdbpgConnect(configuration, containerContext);

WriterUtil.executeSqls(conn, renderedPostSqls, generateJdbcUrl(configuration), DATABASE_TYPE);
DBUtil.closeDBResources(null, null, conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void prepare() {
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
// 说明有 preSql 配置,则此处删除掉
this.originalConfig.remove(Key.PRE_SQL);
Connection preConn = AdsUtil.getAdsConnect(this.originalConfig);
Connection preConn = AdsUtil.getAdsConnect(this.originalConfig, this.containerContext);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.",
StringUtils.join(renderedPreSqls, ";"),
this.originalConfig.getString(Key.ADS_URL));
Expand Down Expand Up @@ -238,7 +238,7 @@ public void post() {
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
// 说明有 preSql 配置,则此处删除掉
this.originalConfig.remove(Key.POST_SQL);
Connection postConn = AdsUtil.getAdsConnect(this.originalConfig);
Connection postConn = AdsUtil.getAdsConnect(this.originalConfig, this.containerContext);
LOG.info(
"Begin to execute postSqls:[{}]. context info:{}.",
StringUtils.join(renderedPostSqls, ";"),
Expand Down Expand Up @@ -372,13 +372,13 @@ public void startWrite(RecordReceiver recordReceiver) {
} else {
// insert 模式
List<String> columns = writerSliceConfig.getList(Key.COLUMN, String.class);
Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig);
Connection connection = AdsUtil.getAdsConnect(this.writerSliceConfig, this.containerContext);
TaskPluginCollector taskPluginCollector = super.getTaskPluginCollector();

if (StringUtils.equalsIgnoreCase(this.writeProxy, "adbClient")) {
this.proxy = new AdsClientProxy(table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
} else {
this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo);
this.proxy = new AdsInsertProxy(schema + "." + table, columns, writerSliceConfig, taskPluginCollector, this.tableInfo, this.containerContext);
}
proxy.startWriteWithConnection(recordReceiver, connection, columnNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,22 @@
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.core.job.IJobContainerContext;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.ads.TableInfo;
import com.alibaba.datax.plugin.writer.adswriter.util.AdsUtil;
import com.alibaba.datax.plugin.writer.adswriter.util.Constant;
import com.alibaba.datax.plugin.writer.adswriter.util.Key;
import com.mysql.jdbc.JDBC4PreparedStatement;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -65,10 +59,12 @@ public class AdsInsertProxy implements AdsProxy {
private String partitionColumn;
private int partitionColumnIndex = -1;
private int partitionCount;
private IJobContainerContext containerContext;

public AdsInsertProxy(String table, List<String> columns, Configuration configuration, TaskPluginCollector taskPluginCollector, TableInfo tableInfo) {
public AdsInsertProxy(String table, List<String> columns, Configuration configuration, TaskPluginCollector taskPluginCollector, TableInfo tableInfo, IJobContainerContext containerContext) {
this.table = table;
this.columns = columns;
this.containerContext = containerContext;
this.configuration = configuration;
this.taskPluginCollector = taskPluginCollector;
this.emptyAsNull = configuration.getBool(Key.EMPTY_AS_NULL, false);
Expand Down Expand Up @@ -308,7 +304,7 @@ private void doBatchRecordDml(List<Record> buffer, String mode) throws Exception
while (null != eachException && maxIter < AdsInsertProxy.MAX_EXCEPTION_CAUSE_ITER) {
if (this.isRetryable(eachException)) {
LOG.warn("doBatchRecordDml meet a retry exception: " + e.getMessage());
this.currentConnection = AdsUtil.getAdsConnect(this.configuration);
this.currentConnection = AdsUtil.getAdsConnect(this.configuration, containerContext);
throw eachException;
} else {
try {
Expand Down Expand Up @@ -375,7 +371,7 @@ private void doOneRecordDml(Record record, String mode) throws Exception {
while (null != eachException && maxIter < AdsInsertProxy.MAX_EXCEPTION_CAUSE_ITER) {
if (this.isRetryable(eachException)) {
LOG.warn("doOneDml meet a retry exception: " + e.getMessage());
this.currentConnection = AdsUtil.getAdsConnect(this.configuration);
this.currentConnection = AdsUtil.getAdsConnect(this.configuration, this.containerContext);
throw eachException;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.job.IJobContainerContext;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.writer.adswriter.AdsWriterErrorCode;
import com.alibaba.datax.plugin.writer.adswriter.load.AdsHelper;
Expand Down Expand Up @@ -167,13 +168,13 @@ public static String prepareJdbcUrl(String adsURL, String schema,

private static IDataSourceFactoryGetter dataSourceFactoryGetter;

public static Connection getAdsConnect(Configuration conf) {
public static Connection getAdsConnect(Configuration conf, IJobContainerContext containerContext) {
String userName = conf.getString(Key.USERNAME);
String passWord = conf.getString(Key.PASSWORD);
String jdbcUrl = AdsUtil.prepareJdbcUrl(conf);

if (dataSourceFactoryGetter == null) {
dataSourceFactoryGetter = DBUtil.getReaderDataSourceFactoryGetter(conf);
dataSourceFactoryGetter = DBUtil.getReaderDataSourceFactoryGetter(conf, containerContext);
}
Connection connection = DBUtil.getConnection(dataSourceFactoryGetter, jdbcUrl, userName, passWord);
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static class Job extends Writer.Job {
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE,this.containerContext);
this.commonRdbmsWriterMaster.init(this.originalConfig);
}

Expand Down Expand Up @@ -64,7 +64,7 @@ public static class Task extends Writer.Task {
public void init() {
this.writerSliceConfig = super.getPluginJobConf();

this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) {
this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE,this.containerContext) {

// @Override
// protected PreparedStatement fillPreparedStatementColumnType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ public Date asDate() {
CommonErrorCode.CONVERT_NOT_SUPPORT, "Bool类型不能转为Date .");
}

@Override
public Date asDate(String dateFormat) {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, "Bool类型不能转为Date .");
}

@Override
public byte[] asBytes() {
throw DataXException.asDataXException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public Date asDate() {
CommonErrorCode.CONVERT_NOT_SUPPORT, "Bytes类型不能转为Date .");
}

@Override
public Date asDate(String dateFormat) {
throw DataXException.asDataXException(
CommonErrorCode.CONVERT_NOT_SUPPORT, "Bytes类型不能转为Date .");
}

@Override
public Boolean asBoolean() {
throw DataXException.asDataXException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ protected void setByteSize(int byteSize) {

public abstract Date asDate();

public abstract Date asDate(String dateFormat);

public abstract byte[] asBytes();

public abstract Boolean asBoolean();
Expand Down
Loading

0 comments on commit bc6c967

Please sign in to comment.