* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,15 +18,22 @@
import com.alibaba.datax.common.element.Record;
+import java.util.Collections;
+import java.util.Map;
+
public interface RecordSender {
- public Record createRecord();
+ public default Record createRecord() {
+ return this.createRecord(Collections.emptyMap());
+ }
+
+ public Record createRecord(Map mapper);
- public void sendToWriter(Record record);
+ public void sendToWriter(Record record);
- public void flush();
+ public void flush();
- public void terminate();
+ public void terminate();
- public void shutdown();
+ public void shutdown();
}
diff --git a/core/pom.xml b/core/pom.xml
index 26caaed345..d5d3d6d820 100755
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -12,6 +12,12 @@
jar
+
+
+ com.qlangtech.tis
+ tis-plugin
+
+
com.alibaba.dataxdatax-transformer
diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
index 90693dce04..fb05b2b0a4 100755
--- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
+++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
@@ -28,8 +28,6 @@
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
-import com.qlangtech.tis.datax.IDataXNameAware;
-import com.qlangtech.tis.datax.IDataXTaskRelevant;
import com.qlangtech.tis.datax.TimeFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
@@ -39,6 +37,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/**
* Created by jingxing on 14-8-24.
@@ -82,12 +81,19 @@ public class JobContainer extends AbstractContainer implements IJobContainerCont
private ErrorRecordChecker errorLimit;
+ private Optional transformerBuildInfo;
+
public JobContainer(Configuration configuration) {
super(configuration);
errorLimit = new ErrorRecordChecker(configuration);
}
+ @Override
+ public Optional getTransformerBuildCfg() {
+ return this.transformerBuildInfo;
+ }
+
@Override
public int getTaskSerializeNum() {
throw new UnsupportedOperationException();
@@ -113,6 +119,11 @@ public String getJobName() {
throw new UnsupportedOperationException();
}
+ @Override
+ public String getCollectionName() {
+ return this.getDataXName();
+ }
+
@Override
public String getDataXName() {
throw new UnsupportedOperationException();
@@ -314,6 +325,14 @@ private void init() {
Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
+
+ Optional transformerList =
+ Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER));
+
+ this.transformerBuildInfo = transformerList.map((transformerCfg) -> {
+ return TransformerUtil.buildTransformerInfo(JobContainer.this, transformerCfg);
+ });
+
//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
@@ -399,10 +418,12 @@ private int split() {
int taskNumber = readerTaskConfigs.size();
List writerTaskConfigs = this.doWriterSplit(taskNumber);
- List transformerList =
- this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
+ Optional transformerList =
+ Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER));
+ transformerList.ifPresent((t) -> {
+ LOG.debug("transformer configuration: " + t);
+ });
- LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list,输出是content下面元素的list
*/
@@ -721,7 +742,7 @@ private List mergeReaderAndWriterTaskConfigs(List
private List mergeReaderAndWriterTaskConfigs(List readerTasksConfigs,
List writerTasksConfigs,
- List transformerConfigs) {
+ Optional transformerConfigs) {
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task"
+ "数目[%d]不等于writer切分的task数目[%d].", readerTasksConfigs.size(), writerTasksConfigs.size()));
@@ -735,9 +756,12 @@ private List mergeReaderAndWriterTaskConfigs(List
taskConfig.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i));
- if (transformerConfigs != null && transformerConfigs.size() > 0) {
- taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
- }
+// if (transformerConfigs != null && transformerConfigs.size() > 0) {
+//
+// }
+ transformerConfigs.ifPresent((transformerCfg) -> {
+ taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerCfg);
+ });
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
diff --git a/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java b/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java
new file mode 100644
index 0000000000..a36e464b37
--- /dev/null
+++ b/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java
@@ -0,0 +1,72 @@
+package com.alibaba.datax.core.job;
+
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.core.transport.transformer.TransformerExecution;
+import com.alibaba.datax.core.transport.transformer.TransformerExecutionParas;
+import com.alibaba.datax.core.transport.transformer.TransformerInfo;
+import com.alibaba.datax.core.util.TISComplexTransformer;
+import com.alibaba.datax.core.util.TransformerBuildInfo;
+import com.alibaba.datax.core.util.container.TransformerConstant;
+import com.google.common.collect.Lists;
+import com.qlangtech.tis.lang.TisException;
+import com.qlangtech.tis.plugin.datax.transformer.RecordTransformer;
+import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
+import com.qlangtech.tis.util.IPluginContext;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * no comments.
+ * Created by liqiang on 16/3/9.
+ */
+public class TransformerUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransformerUtil.class);
+
+ static TransformerBuildInfo buildTransformerInfo(IJobContainerContext containerContext, Configuration taskConfig) {
+
+ final String tabRelevantTransformer = taskConfig.getString(TransformerConstant.JOB_TRANSFORMER_NAME);
+ // Transformer 生成的出参
+ final List relevantKeys = taskConfig.getList(TransformerConstant.JOB_TRANSFORMER_RELEVANT_KEYS, String.class);
+ if (StringUtils.isEmpty(tabRelevantTransformer)) {
+ throw new IllegalArgumentException("tabRelevantTransformer name can not be null");
+ }
+ List result = Lists.newArrayList();
+ TransformerInfo transformerInfo = null;
+ TransformerExecution texec = null;
+ RecordTransformerRules transformers = RecordTransformerRules.loadTransformerRules(
+ IPluginContext.namedContext(containerContext.getCollectionName()), tabRelevantTransformer);
+ if (CollectionUtils.isEmpty(transformers.rules)) {
+ throw new IllegalStateException("transformer:" + tabRelevantTransformer + " can not be empty");
+ }
+
+ for (RecordTransformer t : transformers.rules) {
+ transformerInfo = new TransformerInfo();
+ transformerInfo.setTransformer(new TISComplexTransformer(t.getUdf()));
+ texec = new TransformerExecution(transformerInfo, new TransformerExecutionParas());
+ texec.setIsChecked(true);
+ result.add(texec);
+ }
+ List fromPlugnKeys = transformers.relevantColKeys();
+ if (!CollectionUtils.isEqualCollection(relevantKeys, fromPlugnKeys)) {
+ throw TisException.create("relevant keys from dataX config:" + String.join(",", relevantKeys)
+ + "\n is not equla with key build from plugin:" + String.join(",", fromPlugnKeys)
+ + "\n Please regenerate the DataX Config Files then reTrigger pipeline again!!!");
+ }
+ return new TransformerBuildInfo() {
+ @Override
+ public List getExecutions() {
+ return result;
+ }
+
+ @Override
+ public List relevantOutterColKeys() {
+ return relevantKeys;
+ }
+ };
+ }
+}
diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
index d590dc2785..6f31598ee9 100755
--- a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
+++ b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java
@@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.Map;
public class DirtyRecord implements Record {
private List columns = new ArrayList();
@@ -24,6 +25,12 @@ public static DirtyRecord asDirtyRecord(final Record record) {
return result;
}
+ @Override
+ public void setCol2Index(Map mapper) {
+ throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
+ "该方法不支持!");
+ }
+
@Override
public void addColumn(Column column) {
this.columns.add(
@@ -41,6 +48,24 @@ public void setColumn(int i, Column column) {
"该方法不支持!");
}
+ @Override
+ public void setString(String field, String val) {
+ throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
+ "该方法不支持!");
+ }
+
+ @Override
+ public void setColumn(String field, Column column) {
+ throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
+ "该方法不支持!");
+ }
+
+ @Override
+ public Column getColumn(String field) {
+ throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
+ "该方法不支持!");
+ }
+
@Override
public Column getColumn(int i) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
index e7aae0c59a..34eff2841e 100755
--- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
+++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
@@ -11,6 +11,7 @@
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.job.IJobContainerContext;
+import com.alibaba.datax.core.job.ITransformerBuildInfo;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
@@ -21,15 +22,16 @@
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordTransformerExchanger;
-import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.ClassUtil;
import com.alibaba.datax.core.util.FrameworkErrorCode;
-import com.alibaba.datax.core.util.TransformerUtil;
+import com.alibaba.datax.core.util.TransformerBuildInfo;
+import com.alibaba.datax.core.job.TransformerUtil;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import com.alibaba.fastjson.JSON;
import com.qlangtech.tis.job.common.JobCommon;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -227,7 +229,8 @@ public void start() {
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
- TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
+
+ TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount, this.containerContext);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
@@ -373,7 +376,7 @@ class TaskExecutor {
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
-
+ private final IJobContainerContext containerContext;
/**
* 该处的taskCommunication在多处用到:
* 1. channel
@@ -382,10 +385,12 @@ class TaskExecutor {
*/
private Communication taskCommunication;
- public TaskExecutor(Configuration taskConf, int attemptCount) {
+ public TaskExecutor(Configuration taskConf, int attemptCount, IJobContainerContext containerContext) {
// 获取该taskExecutor的配置
this.taskConfig = taskConf;
- Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER) && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!");
+ this.containerContext = Objects.requireNonNull(containerContext);
+ Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
+ && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!");
// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
@@ -403,8 +408,8 @@ public TaskExecutor(Configuration taskConf, int attemptCount) {
/**
* 获取transformer的参数
*/
-
- List transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
+ Optional transformerBuildCfg = containerContext.getTransformerBuildCfg();
+ // TransformerBuildInfo transformerInfoExecs = TransformerUtil.buildTransformerInfo(this.containerContext, taskConfig);
/**
* 生成writerThread
@@ -425,7 +430,7 @@ public void run() {
/**
* 生成readerThread
*/
- readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs);
+ readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerBuildCfg);
this.readerThread = new Thread(readerRunner, String.format("%d-%d-%d-reader", jobId, taskGroupId,
this.taskId)) {
@Override
@@ -472,7 +477,7 @@ private AbstractRunner generateRunner(PluginType pluginType) {
return generateRunner(pluginType, null);
}
- private AbstractRunner generateRunner(PluginType pluginType, List transformerInfoExecs) {
+ private AbstractRunner generateRunner(PluginType pluginType, Optional transformerBuildCfg) {
AbstractRunner newRunner = null;
TaskPluginCollector pluginCollector;
@@ -485,13 +490,19 @@ private AbstractRunner generateRunner(PluginType pluginType, List 0) {
- recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,
- this.taskCommunication, pluginCollector, transformerInfoExecs);
- } else {
- recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
- }
+
+ RecordSender recordSender = transformerBuildCfg.map((transformerInfoExecs) -> {
+ return (RecordSender) (new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,
+ this.taskCommunication, pluginCollector, transformerInfoExecs.getExecutions()));
+ }).orElseGet(() -> {
+ return new BufferedRecordExchanger(this.channel, pluginCollector);
+ });
+
+// if (transformerInfoExecs != null && CollectionUtils.isNotEmpty(transformerInfoExecs.getExecutions())) {
+//
+// } else {
+// recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
+// }
((ReaderRunner) newRunner).setRecordSender(recordSender);
diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java
index 4ea4902dde..42f5df05fe 100755
--- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java
+++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java
@@ -15,142 +15,145 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class BufferedRecordExchanger implements RecordSender, RecordReceiver {
- private final Channel channel;
+ private final Channel channel;
- private final Configuration configuration;
+ private final Configuration configuration;
- private final List buffer;
+ private final List buffer;
- private int bufferSize ;
+ private int bufferSize;
- protected final int byteCapacity;
+ protected final int byteCapacity;
- private final AtomicInteger memoryBytes = new AtomicInteger(0);
+ private final AtomicInteger memoryBytes = new AtomicInteger(0);
- private int bufferIndex = 0;
+ private int bufferIndex = 0;
- private static Class extends Record> RECORD_CLASS;
+ private static Class extends Record> RECORD_CLASS;
- private volatile boolean shutdown = false;
+ private volatile boolean shutdown = false;
- private final TaskPluginCollector pluginCollector;
+ private final TaskPluginCollector pluginCollector;
- @SuppressWarnings("unchecked")
- public BufferedRecordExchanger(final Channel channel, final TaskPluginCollector pluginCollector) {
- assert null != channel;
- assert null != channel.getConfiguration();
+ @SuppressWarnings("unchecked")
+ public BufferedRecordExchanger(final Channel channel, final TaskPluginCollector pluginCollector) {
+ assert null != channel;
+ assert null != channel.getConfiguration();
- this.channel = channel;
- this.pluginCollector = pluginCollector;
- this.configuration = channel.getConfiguration();
+ this.channel = channel;
+ this.pluginCollector = pluginCollector;
+ this.configuration = channel.getConfiguration();
- this.bufferSize = configuration
- .getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
- this.buffer = new ArrayList(bufferSize);
+ this.bufferSize = configuration
+ .getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
+ this.buffer = new ArrayList(bufferSize);
- //channel的queue默认大小为8M,原来为64M
- this.byteCapacity = configuration.getInt(
- CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
+ //channel的queue默认大小为8M,原来为64M
+ this.byteCapacity = configuration.getInt(
+ CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
- try {
- BufferedRecordExchanger.RECORD_CLASS = ((Class extends Record>) Class
- .forName(configuration.getString(
+ try {
+ BufferedRecordExchanger.RECORD_CLASS = ((Class extends Record>) Class
+ .forName(configuration.getString(
CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS,
"com.alibaba.datax.core.transport.record.DefaultRecord")));
- } catch (Exception e) {
- throw DataXException.asDataXException(
- FrameworkErrorCode.CONFIG_ERROR, e);
- }
- }
-
- @Override
- public Record createRecord() {
- try {
- return BufferedRecordExchanger.RECORD_CLASS.newInstance();
- } catch (Exception e) {
- throw DataXException.asDataXException(
- FrameworkErrorCode.CONFIG_ERROR, e);
- }
- }
-
- @Override
- public void sendToWriter(Record record) {
- if(shutdown){
- throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
- }
-
- Validate.notNull(record, "record不能为空.");
-
- if (record.getMemorySize() > this.byteCapacity) {
- this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
- return;
- }
-
- boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
- if (isFull) {
- flush();
- }
-
- this.buffer.add(record);
- this.bufferIndex++;
- memoryBytes.addAndGet(record.getMemorySize());
- }
-
- @Override
- public void flush() {
- if(shutdown){
- throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
- }
- this.channel.pushAll(this.buffer);
- this.buffer.clear();
- this.bufferIndex = 0;
- this.memoryBytes.set(0);
- }
-
- @Override
- public void terminate() {
- if(shutdown){
- throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
- }
- flush();
- this.channel.pushTerminate(TerminateRecord.get());
- }
-
- @Override
- public Record getFromReader() {
- if(shutdown){
- throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
- }
- boolean isEmpty = (this.bufferIndex >= this.buffer.size());
- if (isEmpty) {
- receive();
- }
-
- Record record = this.buffer.get(this.bufferIndex++);
- if (record instanceof TerminateRecord) {
- record = null;
- }
- return record;
- }
-
- @Override
- public void shutdown(){
- shutdown = true;
- try{
- buffer.clear();
- channel.clear();
- }catch(Throwable t){
- t.printStackTrace();
- }
- }
-
- private void receive() {
- this.channel.pullAll(this.buffer);
- this.bufferIndex = 0;
- this.bufferSize = this.buffer.size();
- }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ FrameworkErrorCode.CONFIG_ERROR, e);
+ }
+ }
+
+ @Override
+ public Record createRecord(Map col2Idx) {
+ try {
+ Record record = BufferedRecordExchanger.RECORD_CLASS.newInstance();
+ record.setCol2Index(col2Idx);
+ return record;
+ } catch (Exception e) {
+ throw DataXException.asDataXException(
+ FrameworkErrorCode.CONFIG_ERROR, e);
+ }
+ }
+
+ @Override
+ public void sendToWriter(Record record) {
+ if (shutdown) {
+ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
+ }
+
+ Validate.notNull(record, "record不能为空.");
+
+ if (record.getMemorySize() > this.byteCapacity) {
+ this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
+ return;
+ }
+
+ boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
+ if (isFull) {
+ flush();
+ }
+
+ this.buffer.add(record);
+ this.bufferIndex++;
+ memoryBytes.addAndGet(record.getMemorySize());
+ }
+
+ @Override
+ public void flush() {
+ if (shutdown) {
+ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
+ }
+ this.channel.pushAll(this.buffer);
+ this.buffer.clear();
+ this.bufferIndex = 0;
+ this.memoryBytes.set(0);
+ }
+
+ @Override
+ public void terminate() {
+ if (shutdown) {
+ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
+ }
+ flush();
+ this.channel.pushTerminate(TerminateRecord.get());
+ }
+
+ @Override
+ public Record getFromReader() {
+ if (shutdown) {
+ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
+ }
+ boolean isEmpty = (this.bufferIndex >= this.buffer.size());
+ if (isEmpty) {
+ receive();
+ }
+
+ Record record = this.buffer.get(this.bufferIndex++);
+ if (record instanceof TerminateRecord) {
+ record = null;
+ }
+ return record;
+ }
+
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ try {
+ buffer.clear();
+ channel.clear();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ private void receive() {
+ this.channel.pullAll(this.buffer);
+ this.bufferIndex = 0;
+ this.bufferSize = this.buffer.size();
+ }
}
diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java
index e9677395b1..6eebb4ac97 100755
--- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java
+++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class BufferedRecordTransformerExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
@@ -72,9 +73,11 @@ public BufferedRecordTransformerExchanger(final int taskGroupId, final int taskI
}
@Override
- public Record createRecord() {
+ public Record createRecord(Map col2Mapper) {
try {
- return BufferedRecordTransformerExchanger.RECORD_CLASS.newInstance();
+ Record record = BufferedRecordTransformerExchanger.RECORD_CLASS.newInstance();
+ record.setCol2Index(col2Mapper);
+ return record;
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR, e);
diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java
index fd91ffe705..b1c7089ba3 100755
--- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java
+++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java
@@ -1,12 +1,12 @@
/**
- * (C) 2010-2014 Alibaba Group Holding Limited.
- *
+ * (C) 2010-2014 Alibaba Group Holding Limited.
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *