Skip to content

Commit

Permalink
enable transformer in TIS and upgrade version to v4.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jun 27, 2024
1 parent a9908da commit c8dc13e
Show file tree
Hide file tree
Showing 26 changed files with 938 additions and 675 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* (C) 2010-2013 Alibaba Group Holding Limited.
*
* (C) 2010-2013 Alibaba Group Holding Limited.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,15 +18,22 @@

import com.alibaba.datax.common.element.Record;

import java.util.Collections;
import java.util.Map;

public interface RecordSender {

public Record createRecord();
public default Record createRecord() {
return this.createRecord(Collections.emptyMap());
}

public Record createRecord(Map<String, Integer> mapper);

public void sendToWriter(Record record);
public void sendToWriter(Record record);

public void flush();
public void flush();

public void terminate();
public void terminate();

public void shutdown();
public void shutdown();
}
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-plugin</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-transformer</artifactId>
Expand Down
42 changes: 33 additions & 9 deletions core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
import com.qlangtech.tis.datax.IDataXNameAware;
import com.qlangtech.tis.datax.IDataXTaskRelevant;
import com.qlangtech.tis.datax.TimeFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
Expand All @@ -39,6 +37,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* Created by jingxing on 14-8-24.
Expand Down Expand Up @@ -82,12 +81,19 @@ public class JobContainer extends AbstractContainer implements IJobContainerCont

private ErrorRecordChecker errorLimit;

private Optional<ITransformerBuildInfo> transformerBuildInfo;

public JobContainer(Configuration configuration) {
super(configuration);

errorLimit = new ErrorRecordChecker(configuration);
}

@Override
public Optional<ITransformerBuildInfo> getTransformerBuildCfg() {
return this.transformerBuildInfo;
}

@Override
public int getTaskSerializeNum() {
throw new UnsupportedOperationException();
Expand All @@ -113,6 +119,11 @@ public String getJobName() {
throw new UnsupportedOperationException();
}

@Override
public String getCollectionName() {
return this.getDataXName();
}

@Override
public String getDataXName() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -314,6 +325,14 @@ private void init() {
Thread.currentThread().setName("job-" + this.jobId);

JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());

Optional<Configuration> transformerList =
Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER));

this.transformerBuildInfo = transformerList.map((transformerCfg) -> {
return TransformerUtil.buildTransformerInfo(JobContainer.this, transformerCfg);
});

//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
Expand Down Expand Up @@ -399,10 +418,12 @@ private int split() {
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this.doWriterSplit(taskNumber);

List<Configuration> transformerList =
this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
Optional<Configuration> transformerList =
Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER));
transformerList.ifPresent((t) -> {
LOG.debug("transformer configuration: " + t);
});

LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list,输出是content下面元素的list
*/
Expand Down Expand Up @@ -721,7 +742,7 @@ private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration>

private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs) {
Optional<Configuration> transformerConfigs) {
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task"
+ "数目[%d]不等于writer切分的task数目[%d].", readerTasksConfigs.size(), writerTasksConfigs.size()));
Expand All @@ -735,9 +756,12 @@ private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration>
taskConfig.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i));

if (transformerConfigs != null && transformerConfigs.size() > 0) {
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
// if (transformerConfigs != null && transformerConfigs.size() > 0) {
//
// }
transformerConfigs.ifPresent((transformerCfg) -> {
taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerCfg);
});

taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.alibaba.datax.core.job;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.transport.transformer.TransformerExecutionParas;
import com.alibaba.datax.core.transport.transformer.TransformerInfo;
import com.alibaba.datax.core.util.TISComplexTransformer;
import com.alibaba.datax.core.util.TransformerBuildInfo;
import com.alibaba.datax.core.util.container.TransformerConstant;
import com.google.common.collect.Lists;
import com.qlangtech.tis.lang.TisException;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformer;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.util.IPluginContext;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* no comments.
* Created by liqiang on 16/3/9.
*/
public class TransformerUtil {

private static final Logger LOG = LoggerFactory.getLogger(TransformerUtil.class);

static TransformerBuildInfo buildTransformerInfo(IJobContainerContext containerContext, Configuration taskConfig) {

final String tabRelevantTransformer = taskConfig.getString(TransformerConstant.JOB_TRANSFORMER_NAME);
// Transformer 生成的出参
final List<String> relevantKeys = taskConfig.getList(TransformerConstant.JOB_TRANSFORMER_RELEVANT_KEYS, String.class);
if (StringUtils.isEmpty(tabRelevantTransformer)) {
throw new IllegalArgumentException("tabRelevantTransformer name can not be null");
}
List<TransformerExecution> result = Lists.newArrayList();
TransformerInfo transformerInfo = null;
TransformerExecution texec = null;
RecordTransformerRules transformers = RecordTransformerRules.loadTransformerRules(
IPluginContext.namedContext(containerContext.getCollectionName()), tabRelevantTransformer);
if (CollectionUtils.isEmpty(transformers.rules)) {
throw new IllegalStateException("transformer:" + tabRelevantTransformer + " can not be empty");
}

for (RecordTransformer t : transformers.rules) {
transformerInfo = new TransformerInfo();
transformerInfo.setTransformer(new TISComplexTransformer(t.getUdf()));
texec = new TransformerExecution(transformerInfo, new TransformerExecutionParas());
texec.setIsChecked(true);
result.add(texec);
}
List<String> fromPlugnKeys = transformers.relevantColKeys();
if (!CollectionUtils.isEqualCollection(relevantKeys, fromPlugnKeys)) {
throw TisException.create("relevant keys from dataX config:" + String.join(",", relevantKeys)
+ "\n is not equla with key build from plugin:" + String.join(",", fromPlugnKeys)
+ "\n Please regenerate the DataX Config Files then reTrigger pipeline again!!!");
}
return new TransformerBuildInfo() {
@Override
public List<TransformerExecution> getExecutions() {
return result;
}

@Override
public List<String> relevantOutterColKeys() {
return relevantKeys;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

public class DirtyRecord implements Record {
private List<Column> columns = new ArrayList<Column>();
Expand All @@ -24,6 +25,12 @@ public static DirtyRecord asDirtyRecord(final Record record) {
return result;
}

@Override
public void setCol2Index(Map<String, Integer> mapper) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public void addColumn(Column column) {
this.columns.add(
Expand All @@ -41,6 +48,24 @@ public void setColumn(int i, Column column) {
"该方法不支持!");
}

@Override
public void setString(String field, String val) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public void setColumn(String field, Column column) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Column getColumn(String field) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Column getColumn(int i) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.job.IJobContainerContext;
import com.alibaba.datax.core.job.ITransformerBuildInfo;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
Expand All @@ -21,15 +22,16 @@
import com.alibaba.datax.core.transport.channel.Channel;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger;
import com.alibaba.datax.core.transport.exchanger.BufferedRecordTransformerExchanger;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.ClassUtil;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.TransformerUtil;
import com.alibaba.datax.core.util.TransformerBuildInfo;
import com.alibaba.datax.core.job.TransformerUtil;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import com.alibaba.fastjson.JSON;
import com.qlangtech.tis.job.common.JobCommon;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -227,7 +229,8 @@ public void start() {
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);

TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount, this.containerContext);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();

Expand Down Expand Up @@ -373,7 +376,7 @@ class TaskExecutor {
private ReaderRunner readerRunner;

private WriterRunner writerRunner;

private final IJobContainerContext containerContext;
/**
* 该处的taskCommunication在多处用到:
* 1. channel
Expand All @@ -382,10 +385,12 @@ class TaskExecutor {
*/
private Communication taskCommunication;

public TaskExecutor(Configuration taskConf, int attemptCount) {
public TaskExecutor(Configuration taskConf, int attemptCount, IJobContainerContext containerContext) {
// 获取该taskExecutor的配置
this.taskConfig = taskConf;
Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER) && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!");
this.containerContext = Objects.requireNonNull(containerContext);
Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
&& null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!");

// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
Expand All @@ -403,8 +408,8 @@ public TaskExecutor(Configuration taskConf, int attemptCount) {
/**
* 获取transformer的参数
*/

List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
Optional<TransformerBuildInfo> transformerBuildCfg = containerContext.getTransformerBuildCfg();
// TransformerBuildInfo transformerInfoExecs = TransformerUtil.buildTransformerInfo(this.containerContext, taskConfig);

/**
* 生成writerThread
Expand All @@ -425,7 +430,7 @@ public void run() {
/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs);
readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerBuildCfg);
this.readerThread = new Thread(readerRunner, String.format("%d-%d-%d-reader", jobId, taskGroupId,
this.taskId)) {
@Override
Expand Down Expand Up @@ -472,7 +477,7 @@ private AbstractRunner generateRunner(PluginType pluginType) {
return generateRunner(pluginType, null);
}

private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
private AbstractRunner generateRunner(PluginType pluginType, Optional<TransformerBuildInfo> transformerBuildCfg) {
AbstractRunner newRunner = null;
TaskPluginCollector pluginCollector;

Expand All @@ -485,13 +490,19 @@ private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExe
pluginCollector = ClassUtil.instantiate(taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication, PluginType.READER);

RecordSender recordSender;
if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,
this.taskCommunication, pluginCollector, transformerInfoExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}

RecordSender recordSender = transformerBuildCfg.map((transformerInfoExecs) -> {
return (RecordSender) (new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,
this.taskCommunication, pluginCollector, transformerInfoExecs.getExecutions()));
}).orElseGet(() -> {
return new BufferedRecordExchanger(this.channel, pluginCollector);
});

// if (transformerInfoExecs != null && CollectionUtils.isNotEmpty(transformerInfoExecs.getExecutions())) {
//
// } else {
// recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
// }

((ReaderRunner) newRunner).setRecordSender(recordSender);

Expand Down
Loading

0 comments on commit c8dc13e

Please sign in to comment.