From c8dc13e50b0ed41ed76f52371c6efb8bdb5c89b1 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Thu, 27 Jun 2024 13:57:46 +0800 Subject: [PATCH] enable transformer in TIS and upgrade version to v4.0.1 --- .../datax/common/plugin/RecordSender.java | 27 +- core/pom.xml | 6 + .../alibaba/datax/core/job/JobContainer.java | 42 ++- .../datax/core/job/TransformerUtil.java | 72 ++++++ .../plugin/task/util/DirtyRecord.java | 25 ++ .../core/taskgroup/TaskGroupContainer.java | 45 ++-- .../exchanger/BufferedRecordExchanger.java | 239 +++++++++--------- .../BufferedRecordTransformerExchanger.java | 7 +- .../transport/exchanger/RecordExchanger.java | 154 +++++------ .../core/transport/record/DefaultRecord.java | 228 ++++++++++------- .../transport/record/TerminateRecord.java | 22 ++ .../core/util/TISComplexTransformer.java | 25 ++ .../datax/core/util/TransformerBuildInfo.java | 17 ++ .../datax/core/util/TransformerUtil.java | 107 -------- .../core/util/container/CoreConstant.java | 149 ++++++----- .../plugin/reader/drdsreader/DrdsReader.java | 1 + .../drdsreader/DrdsReaderSplitUtil.java | 17 +- .../rdbms/reader/CommonRdbmsReader.java | 33 ++- .../datax/plugin/rdbms/reader/Key.java | 1 + .../util/OriginalConfPretreatmentUtil.java | 6 +- .../plugin/rdbms/reader/util/QuerySql.java | 84 ++++++ .../rdbms/reader/util/ReaderSplitUtil.java | 102 ++++---- .../reader/util/SingleTableSplitUtil.java | 199 ++++++++------- pom.xml | 2 +- .../rdbmswriter/SubCommonRdbmsWriter.java | 1 + .../datax/transformer/ComplexTransformer.java | 2 +- 26 files changed, 938 insertions(+), 675 deletions(-) create mode 100644 core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java create mode 100644 core/src/main/java/com/alibaba/datax/core/util/TISComplexTransformer.java create mode 100644 core/src/main/java/com/alibaba/datax/core/util/TransformerBuildInfo.java delete mode 100644 core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java create mode 100644 plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/QuerySql.java diff --git a/common/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java b/common/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java index 0d6926098f..e4f9e329a5 100755 --- a/common/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java +++ b/common/src/main/java/com/alibaba/datax/common/plugin/RecordSender.java @@ -1,12 +1,12 @@ /** - * (C) 2010-2013 Alibaba Group Holding Limited. - * + * (C) 2010-2013 Alibaba Group Holding Limited. + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,15 +18,22 @@ import com.alibaba.datax.common.element.Record; +import java.util.Collections; +import java.util.Map; + public interface RecordSender { - public Record createRecord(); + public default Record createRecord() { + return this.createRecord(Collections.emptyMap()); + } + + public Record createRecord(Map mapper); - public void sendToWriter(Record record); + public void sendToWriter(Record record); - public void flush(); + public void flush(); - public void terminate(); + public void terminate(); - public void shutdown(); + public void shutdown(); } diff --git a/core/pom.xml b/core/pom.xml index 26caaed345..d5d3d6d820 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -12,6 +12,12 @@ jar + + + com.qlangtech.tis + tis-plugin + + com.alibaba.datax datax-transformer diff --git a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java index 90693dce04..fb05b2b0a4 100755 --- a/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java @@ -28,8 +28,6 @@ import com.alibaba.datax.core.util.container.LoadUtil; import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode; import com.alibaba.fastjson.JSON; -import com.qlangtech.tis.datax.IDataXNameAware; -import com.qlangtech.tis.datax.IDataXTaskRelevant; import com.qlangtech.tis.datax.TimeFormat; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; @@ -39,6 +37,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * Created by jingxing on 14-8-24. @@ -82,12 +81,19 @@ public class JobContainer extends AbstractContainer implements IJobContainerCont private ErrorRecordChecker errorLimit; + private Optional transformerBuildInfo; + public JobContainer(Configuration configuration) { super(configuration); errorLimit = new ErrorRecordChecker(configuration); } + @Override + public Optional getTransformerBuildCfg() { + return this.transformerBuildInfo; + } + @Override public int getTaskSerializeNum() { throw new UnsupportedOperationException(); @@ -113,6 +119,11 @@ public String getJobName() { throw new UnsupportedOperationException(); } + @Override + public String getCollectionName() { + return this.getDataXName(); + } + @Override public String getDataXName() { throw new UnsupportedOperationException(); @@ -314,6 +325,14 @@ private void init() { Thread.currentThread().setName("job-" + this.jobId); JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator()); + + Optional transformerList = + Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER)); + + this.transformerBuildInfo = transformerList.map((transformerCfg) -> { + return TransformerUtil.buildTransformerInfo(JobContainer.this, transformerCfg); + }); + //必须先Reader ,后Writer this.jobReader = this.initJobReader(jobPluginCollector); this.jobWriter = this.initJobWriter(jobPluginCollector); @@ -399,10 +418,12 @@ private int split() { int taskNumber = readerTaskConfigs.size(); List writerTaskConfigs = this.doWriterSplit(taskNumber); - List transformerList = - this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); + Optional transformerList = + Optional.ofNullable(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER)); + transformerList.ifPresent((t) -> { + LOG.debug("transformer configuration: " + t); + }); - LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList)); /** * 输入是reader和writer的parameter list,输出是content下面元素的list */ @@ -721,7 +742,7 @@ private List mergeReaderAndWriterTaskConfigs(List private List mergeReaderAndWriterTaskConfigs(List readerTasksConfigs, List writerTasksConfigs, - List transformerConfigs) { + Optional transformerConfigs) { if (readerTasksConfigs.size() != writerTasksConfigs.size()) { throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task" + "数目[%d]不等于writer切分的task数目[%d].", readerTasksConfigs.size(), writerTasksConfigs.size())); @@ -735,9 +756,12 @@ private List mergeReaderAndWriterTaskConfigs(List taskConfig.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName); taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i)); - if (transformerConfigs != null && transformerConfigs.size() > 0) { - taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs); - } +// if (transformerConfigs != null && transformerConfigs.size() > 0) { +// +// } + transformerConfigs.ifPresent((transformerCfg) -> { + taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerCfg); + }); taskConfig.set(CoreConstant.TASK_ID, i); contentConfigs.add(taskConfig); diff --git a/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java b/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java new file mode 100644 index 0000000000..a36e464b37 --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/job/TransformerUtil.java @@ -0,0 +1,72 @@ +package com.alibaba.datax.core.job; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.transformer.TransformerExecution; +import com.alibaba.datax.core.transport.transformer.TransformerExecutionParas; +import com.alibaba.datax.core.transport.transformer.TransformerInfo; +import com.alibaba.datax.core.util.TISComplexTransformer; +import com.alibaba.datax.core.util.TransformerBuildInfo; +import com.alibaba.datax.core.util.container.TransformerConstant; +import com.google.common.collect.Lists; +import com.qlangtech.tis.lang.TisException; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformer; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import com.qlangtech.tis.util.IPluginContext; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * no comments. + * Created by liqiang on 16/3/9. + */ +public class TransformerUtil { + + private static final Logger LOG = LoggerFactory.getLogger(TransformerUtil.class); + + static TransformerBuildInfo buildTransformerInfo(IJobContainerContext containerContext, Configuration taskConfig) { + + final String tabRelevantTransformer = taskConfig.getString(TransformerConstant.JOB_TRANSFORMER_NAME); + // Transformer 生成的出参 + final List relevantKeys = taskConfig.getList(TransformerConstant.JOB_TRANSFORMER_RELEVANT_KEYS, String.class); + if (StringUtils.isEmpty(tabRelevantTransformer)) { + throw new IllegalArgumentException("tabRelevantTransformer name can not be null"); + } + List result = Lists.newArrayList(); + TransformerInfo transformerInfo = null; + TransformerExecution texec = null; + RecordTransformerRules transformers = RecordTransformerRules.loadTransformerRules( + IPluginContext.namedContext(containerContext.getCollectionName()), tabRelevantTransformer); + if (CollectionUtils.isEmpty(transformers.rules)) { + throw new IllegalStateException("transformer:" + tabRelevantTransformer + " can not be empty"); + } + + for (RecordTransformer t : transformers.rules) { + transformerInfo = new TransformerInfo(); + transformerInfo.setTransformer(new TISComplexTransformer(t.getUdf())); + texec = new TransformerExecution(transformerInfo, new TransformerExecutionParas()); + texec.setIsChecked(true); + result.add(texec); + } + List fromPlugnKeys = transformers.relevantColKeys(); + if (!CollectionUtils.isEqualCollection(relevantKeys, fromPlugnKeys)) { + throw TisException.create("relevant keys from dataX config:" + String.join(",", relevantKeys) + + "\n is not equla with key build from plugin:" + String.join(",", fromPlugnKeys) + + "\n Please regenerate the DataX Config Files then reTrigger pipeline again!!!"); + } + return new TransformerBuildInfo() { + @Override + public List getExecutions() { + return result; + } + + @Override + public List relevantOutterColKeys() { + return relevantKeys; + } + }; + } +} diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java index d590dc2785..6f31598ee9 100755 --- a/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/plugin/task/util/DirtyRecord.java @@ -11,6 +11,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Map; public class DirtyRecord implements Record { private List columns = new ArrayList(); @@ -24,6 +25,12 @@ public static DirtyRecord asDirtyRecord(final Record record) { return result; } + @Override + public void setCol2Index(Map mapper) { + throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, + "该方法不支持!"); + } + @Override public void addColumn(Column column) { this.columns.add( @@ -41,6 +48,24 @@ public void setColumn(int i, Column column) { "该方法不支持!"); } + @Override + public void setString(String field, String val) { + throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, + "该方法不支持!"); + } + + @Override + public void setColumn(String field, Column column) { + throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, + "该方法不支持!"); + } + + @Override + public Column getColumn(String field) { + throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, + "该方法不支持!"); + } + @Override public Column getColumn(int i) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, diff --git a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java index e7aae0c59a..34eff2841e 100755 --- a/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java +++ b/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java @@ -11,6 +11,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.AbstractContainer; import com.alibaba.datax.core.job.IJobContainerContext; +import com.alibaba.datax.core.job.ITransformerBuildInfo; import com.alibaba.datax.core.statistics.communication.Communication; import com.alibaba.datax.core.statistics.communication.CommunicationTool; import com.alibaba.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator; @@ -21,15 +22,16 @@ import com.alibaba.datax.core.transport.channel.Channel; import com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger; import com.alibaba.datax.core.transport.exchanger.BufferedRecordTransformerExchanger; -import com.alibaba.datax.core.transport.transformer.TransformerExecution; import com.alibaba.datax.core.util.ClassUtil; import com.alibaba.datax.core.util.FrameworkErrorCode; -import com.alibaba.datax.core.util.TransformerUtil; +import com.alibaba.datax.core.util.TransformerBuildInfo; +import com.alibaba.datax.core.job.TransformerUtil; import com.alibaba.datax.core.util.container.CoreConstant; import com.alibaba.datax.core.util.container.LoadUtil; import com.alibaba.datax.dataxservice.face.domain.enums.State; import com.alibaba.fastjson.JSON; import com.qlangtech.tis.job.common.JobCommon; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -227,7 +229,8 @@ public void start() { } } Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig; - TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); + + TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount, this.containerContext); taskStartTimeMap.put(taskId, System.currentTimeMillis()); taskExecutor.doStart(); @@ -373,7 +376,7 @@ class TaskExecutor { private ReaderRunner readerRunner; private WriterRunner writerRunner; - + private final IJobContainerContext containerContext; /** * 该处的taskCommunication在多处用到: * 1. channel @@ -382,10 +385,12 @@ class TaskExecutor { */ private Communication taskCommunication; - public TaskExecutor(Configuration taskConf, int attemptCount) { + public TaskExecutor(Configuration taskConf, int attemptCount, IJobContainerContext containerContext) { // 获取该taskExecutor的配置 this.taskConfig = taskConf; - Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER) && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!"); + this.containerContext = Objects.requireNonNull(containerContext); + Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER) + && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!"); // 得到taskId this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID); @@ -403,8 +408,8 @@ public TaskExecutor(Configuration taskConf, int attemptCount) { /** * 获取transformer的参数 */ - - List transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig); + Optional transformerBuildCfg = containerContext.getTransformerBuildCfg(); + // TransformerBuildInfo transformerInfoExecs = TransformerUtil.buildTransformerInfo(this.containerContext, taskConfig); /** * 生成writerThread @@ -425,7 +430,7 @@ public void run() { /** * 生成readerThread */ - readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs); + readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerBuildCfg); this.readerThread = new Thread(readerRunner, String.format("%d-%d-%d-reader", jobId, taskGroupId, this.taskId)) { @Override @@ -472,7 +477,7 @@ private AbstractRunner generateRunner(PluginType pluginType) { return generateRunner(pluginType, null); } - private AbstractRunner generateRunner(PluginType pluginType, List transformerInfoExecs) { + private AbstractRunner generateRunner(PluginType pluginType, Optional transformerBuildCfg) { AbstractRunner newRunner = null; TaskPluginCollector pluginCollector; @@ -485,13 +490,19 @@ private AbstractRunner generateRunner(PluginType pluginType, List 0) { - recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel, - this.taskCommunication, pluginCollector, transformerInfoExecs); - } else { - recordSender = new BufferedRecordExchanger(this.channel, pluginCollector); - } + + RecordSender recordSender = transformerBuildCfg.map((transformerInfoExecs) -> { + return (RecordSender) (new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel, + this.taskCommunication, pluginCollector, transformerInfoExecs.getExecutions())); + }).orElseGet(() -> { + return new BufferedRecordExchanger(this.channel, pluginCollector); + }); + +// if (transformerInfoExecs != null && CollectionUtils.isNotEmpty(transformerInfoExecs.getExecutions())) { +// +// } else { +// recordSender = new BufferedRecordExchanger(this.channel, pluginCollector); +// } ((ReaderRunner) newRunner).setRecordSender(recordSender); diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java index 4ea4902dde..42f5df05fe 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java @@ -15,142 +15,145 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class BufferedRecordExchanger implements RecordSender, RecordReceiver { - private final Channel channel; + private final Channel channel; - private final Configuration configuration; + private final Configuration configuration; - private final List buffer; + private final List buffer; - private int bufferSize ; + private int bufferSize; - protected final int byteCapacity; + protected final int byteCapacity; - private final AtomicInteger memoryBytes = new AtomicInteger(0); + private final AtomicInteger memoryBytes = new AtomicInteger(0); - private int bufferIndex = 0; + private int bufferIndex = 0; - private static Class RECORD_CLASS; + private static Class RECORD_CLASS; - private volatile boolean shutdown = false; + private volatile boolean shutdown = false; - private final TaskPluginCollector pluginCollector; + private final TaskPluginCollector pluginCollector; - @SuppressWarnings("unchecked") - public BufferedRecordExchanger(final Channel channel, final TaskPluginCollector pluginCollector) { - assert null != channel; - assert null != channel.getConfiguration(); + @SuppressWarnings("unchecked") + public BufferedRecordExchanger(final Channel channel, final TaskPluginCollector pluginCollector) { + assert null != channel; + assert null != channel.getConfiguration(); - this.channel = channel; - this.pluginCollector = pluginCollector; - this.configuration = channel.getConfiguration(); + this.channel = channel; + this.pluginCollector = pluginCollector; + this.configuration = channel.getConfiguration(); - this.bufferSize = configuration - .getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE); - this.buffer = new ArrayList(bufferSize); + this.bufferSize = configuration + .getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE); + this.buffer = new ArrayList(bufferSize); - //channel的queue默认大小为8M,原来为64M - this.byteCapacity = configuration.getInt( - CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); + //channel的queue默认大小为8M,原来为64M + this.byteCapacity = configuration.getInt( + CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024); - try { - BufferedRecordExchanger.RECORD_CLASS = ((Class) Class - .forName(configuration.getString( + try { + BufferedRecordExchanger.RECORD_CLASS = ((Class) Class + .forName(configuration.getString( CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS, "com.alibaba.datax.core.transport.record.DefaultRecord"))); - } catch (Exception e) { - throw DataXException.asDataXException( - FrameworkErrorCode.CONFIG_ERROR, e); - } - } - - @Override - public Record createRecord() { - try { - return BufferedRecordExchanger.RECORD_CLASS.newInstance(); - } catch (Exception e) { - throw DataXException.asDataXException( - FrameworkErrorCode.CONFIG_ERROR, e); - } - } - - @Override - public void sendToWriter(Record record) { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - - Validate.notNull(record, "record不能为空."); - - if (record.getMemorySize() > this.byteCapacity) { - this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity))); - return; - } - - boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); - if (isFull) { - flush(); - } - - this.buffer.add(record); - this.bufferIndex++; - memoryBytes.addAndGet(record.getMemorySize()); - } - - @Override - public void flush() { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - this.channel.pushAll(this.buffer); - this.buffer.clear(); - this.bufferIndex = 0; - this.memoryBytes.set(0); - } - - @Override - public void terminate() { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - flush(); - this.channel.pushTerminate(TerminateRecord.get()); - } - - @Override - public Record getFromReader() { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - boolean isEmpty = (this.bufferIndex >= this.buffer.size()); - if (isEmpty) { - receive(); - } - - Record record = this.buffer.get(this.bufferIndex++); - if (record instanceof TerminateRecord) { - record = null; - } - return record; - } - - @Override - public void shutdown(){ - shutdown = true; - try{ - buffer.clear(); - channel.clear(); - }catch(Throwable t){ - t.printStackTrace(); - } - } - - private void receive() { - this.channel.pullAll(this.buffer); - this.bufferIndex = 0; - this.bufferSize = this.buffer.size(); - } + } catch (Exception e) { + throw DataXException.asDataXException( + FrameworkErrorCode.CONFIG_ERROR, e); + } + } + + @Override + public Record createRecord(Map col2Idx) { + try { + Record record = BufferedRecordExchanger.RECORD_CLASS.newInstance(); + record.setCol2Index(col2Idx); + return record; + } catch (Exception e) { + throw DataXException.asDataXException( + FrameworkErrorCode.CONFIG_ERROR, e); + } + } + + @Override + public void sendToWriter(Record record) { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + + Validate.notNull(record, "record不能为空."); + + if (record.getMemorySize() > this.byteCapacity) { + this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity))); + return; + } + + boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); + if (isFull) { + flush(); + } + + this.buffer.add(record); + this.bufferIndex++; + memoryBytes.addAndGet(record.getMemorySize()); + } + + @Override + public void flush() { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + this.channel.pushAll(this.buffer); + this.buffer.clear(); + this.bufferIndex = 0; + this.memoryBytes.set(0); + } + + @Override + public void terminate() { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + flush(); + this.channel.pushTerminate(TerminateRecord.get()); + } + + @Override + public Record getFromReader() { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + boolean isEmpty = (this.bufferIndex >= this.buffer.size()); + if (isEmpty) { + receive(); + } + + Record record = this.buffer.get(this.bufferIndex++); + if (record instanceof TerminateRecord) { + record = null; + } + return record; + } + + @Override + public void shutdown() { + shutdown = true; + try { + buffer.clear(); + channel.clear(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + private void receive() { + this.channel.pullAll(this.buffer); + this.bufferIndex = 0; + this.bufferSize = this.buffer.size(); + } } diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java index e9677395b1..6eebb4ac97 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordTransformerExchanger.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class BufferedRecordTransformerExchanger extends TransformerExchanger implements RecordSender, RecordReceiver { @@ -72,9 +73,11 @@ public BufferedRecordTransformerExchanger(final int taskGroupId, final int taskI } @Override - public Record createRecord() { + public Record createRecord(Map col2Mapper) { try { - return BufferedRecordTransformerExchanger.RECORD_CLASS.newInstance(); + Record record = BufferedRecordTransformerExchanger.RECORD_CLASS.newInstance(); + record.setCol2Index(col2Mapper); + return record; } catch (Exception e) { throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, e); diff --git a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java index fd91ffe705..b1c7089ba3 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/exchanger/RecordExchanger.java @@ -1,12 +1,12 @@ /** - * (C) 2010-2014 Alibaba Group Holding Limited. - * + * (C) 2010-2014 Alibaba Group Holding Limited. + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,83 +31,87 @@ import com.alibaba.datax.core.util.container.CoreConstant; import java.util.List; +import java.util.Map; public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver { - private Channel channel; + private Channel channel; - private Configuration configuration; + private Configuration configuration; - private static Class RECORD_CLASS; + private static Class RECORD_CLASS; - private volatile boolean shutdown = false; + private volatile boolean shutdown = false; - @SuppressWarnings("unchecked") - public RecordExchanger(final int taskGroupId, final int taskId,final Channel channel, final Communication communication,List transformerExecs, final TaskPluginCollector pluginCollector) { - super(taskGroupId,taskId,communication,transformerExecs, pluginCollector); - assert channel != null; - this.channel = channel; - this.configuration = channel.getConfiguration(); - try { - RecordExchanger.RECORD_CLASS = (Class) Class - .forName(configuration.getString( + @SuppressWarnings("unchecked") + public RecordExchanger(final int taskGroupId, final int taskId, final Channel channel, final Communication communication, List transformerExecs, final TaskPluginCollector pluginCollector) { + super(taskGroupId, taskId, communication, transformerExecs, pluginCollector); + assert channel != null; + this.channel = channel; + this.configuration = channel.getConfiguration(); + try { + RecordExchanger.RECORD_CLASS = (Class) Class + .forName(configuration.getString( CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS, "com.alibaba.datax.core.transport.record.DefaultRecord")); - } catch (ClassNotFoundException e) { - throw DataXException.asDataXException( - FrameworkErrorCode.CONFIG_ERROR, e); - } - } - - @Override - public Record getFromReader() { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - Record record = this.channel.pull(); - return (record instanceof TerminateRecord ? null : record); - } - - @Override - public Record createRecord() { - try { - return RECORD_CLASS.newInstance(); - } catch (Exception e) { - throw DataXException.asDataXException( - FrameworkErrorCode.CONFIG_ERROR, e); - } - } - - @Override - public void sendToWriter(Record record) { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - record = doTransformer(record); - if (record == null) { - return; - } - this.channel.push(record); - //和channel的统计保持同步 - doStat(); - } - - @Override - public void flush() { - } - - @Override - public void terminate() { - if(shutdown){ - throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); - } - this.channel.pushTerminate(TerminateRecord.get()); - //和channel的统计保持同步 - doStat(); - } - - @Override - public void shutdown(){ - shutdown = true; - } + } catch (ClassNotFoundException e) { + throw DataXException.asDataXException( + FrameworkErrorCode.CONFIG_ERROR, e); + } + } + + @Override + public Record getFromReader() { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + Record record = this.channel.pull(); + return (record instanceof TerminateRecord ? null : record); + } + + @Override + public Record createRecord(Map mapper) { + try { + Record record = RECORD_CLASS.newInstance(); + record.setCol2Index(mapper); + return record; + } catch (Exception e) { + throw DataXException.asDataXException( + FrameworkErrorCode.CONFIG_ERROR, e); + } + } + + + @Override + public void sendToWriter(Record record) { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + record = doTransformer(record); + if (record == null) { + return; + } + this.channel.push(record); + //和channel的统计保持同步 + doStat(); + } + + @Override + public void flush() { + } + + @Override + public void terminate() { + if (shutdown) { + throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); + } + this.channel.pushTerminate(TerminateRecord.get()); + //和channel的统计保持同步 + doStat(); + } + + @Override + public void shutdown() { + shutdown = true; + } } diff --git a/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java b/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java index 2598bc8c80..4117a13ed1 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/record/DefaultRecord.java @@ -2,6 +2,7 @@ import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.core.util.ClassSize; import com.alibaba.datax.core.util.FrameworkErrorCode; @@ -11,6 +12,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Created by jingxing on 14-8-24. @@ -18,102 +20,134 @@ public class DefaultRecord implements Record { - private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16; - - private List columns; - - private int byteSize; - - // 首先是Record本身需要的内存 - private int memorySize = ClassSize.DefaultRecordHead; - - public DefaultRecord() { - this.columns = new ArrayList(RECORD_AVERGAE_COLUMN_NUMBER); - } - - @Override - public void addColumn(Column column) { - columns.add(column); - incrByteSize(column); - } - - @Override - public Column getColumn(int i) { - if (i < 0 || i >= columns.size()) { - return null; - } - return columns.get(i); - } - - @Override - public void setColumn(int i, final Column column) { - if (i < 0) { - throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, - "不能给index小于0的column设置值"); - } - - if (i >= columns.size()) { - expandCapacity(i + 1); - } - - decrByteSize(getColumn(i)); - this.columns.set(i, column); - incrByteSize(getColumn(i)); - } - - @Override - public String toString() { - Map json = new HashMap(); - json.put("size", this.getColumnNumber()); - json.put("data", this.columns); - return JSON.toJSONString(json); - } - - @Override - public int getColumnNumber() { - return this.columns.size(); - } - - @Override - public int getByteSize() { - return byteSize; - } - - public int getMemorySize(){ - return memorySize; - } - - private void decrByteSize(final Column column) { - if (null == column) { - return; - } - - byteSize -= column.getByteSize(); - - //内存的占用是column对象的头 再加实际大小 - memorySize = memorySize - ClassSize.ColumnHead - column.getByteSize(); - } - - private void incrByteSize(final Column column) { - if (null == column) { - return; - } - - byteSize += column.getByteSize(); - - //内存的占用是column对象的头 再加实际大小 - memorySize = memorySize + ClassSize.ColumnHead + column.getByteSize(); - } - - private void expandCapacity(int totalSize) { - if (totalSize <= 0) { - return; - } - - int needToExpand = totalSize - columns.size(); - while (needToExpand-- > 0) { - this.columns.add(null); - } - } + private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16; + private Map col2Idx; + private List columns; + + private int byteSize; + + // 首先是Record本身需要的内存 + private int memorySize = ClassSize.DefaultRecordHead; + + public DefaultRecord() { + this.columns = new ArrayList(RECORD_AVERGAE_COLUMN_NUMBER); + } + + + @Override + public void setCol2Index(Map mapper) { + this.col2Idx = Objects.requireNonNull(mapper, "param mapper can not be null"); + } + + @Override + public void addColumn(Column column) { + columns.add(column); + incrByteSize(column); + } + + @Override + public void setString(String field, String val) { + this.setColumn(field, new StringColumn(val)); + } + + @Override + public void setColumn(String field, Column column) { + final Integer idx = this.getColIdx(field); + this.setColumn(idx, column); + } + + private Integer getColIdx(String field) { + Integer idx = this.col2Idx.get(field); + if (idx == null) { + throw new IllegalStateException("field:'" + field + + "' relevant offset index can not be null,exist cols:" + + String.join(",", this.col2Idx.keySet())); + } + return idx; + } + + @Override + public Column getColumn(String field) { + return getColumn(getColIdx(field)); + } + + @Override + public Column getColumn(int i) { + if (i < 0 || i >= columns.size()) { + return null; + } + return columns.get(i); + } + + @Override + public void setColumn(int i, final Column column) { + if (i < 0) { + throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, + "不能给index小于0的column设置值"); + } + + if (i >= columns.size()) { + expandCapacity(i + 1); + } + + decrByteSize(getColumn(i)); + this.columns.set(i, column); + incrByteSize(getColumn(i)); + } + + @Override + public String toString() { + Map json = new HashMap(); + json.put("size", this.getColumnNumber()); + json.put("data", this.columns); + return JSON.toJSONString(json); + } + + @Override + public int getColumnNumber() { + return this.columns.size(); + } + + @Override + public int getByteSize() { + return byteSize; + } + + public int getMemorySize() { + return memorySize; + } + + private void decrByteSize(final Column column) { + if (null == column) { + return; + } + + byteSize -= column.getByteSize(); + + //内存的占用是column对象的头 再加实际大小 + memorySize = memorySize - ClassSize.ColumnHead - column.getByteSize(); + } + + private void incrByteSize(final Column column) { + if (null == column) { + return; + } + + byteSize += column.getByteSize(); + + //内存的占用是column对象的头 再加实际大小 + memorySize = memorySize + ClassSize.ColumnHead + column.getByteSize(); + } + + private void expandCapacity(int totalSize) { + if (totalSize <= 0) { + return; + } + + int needToExpand = totalSize - columns.size(); + while (needToExpand-- > 0) { + this.columns.add(null); + } + } } diff --git a/core/src/main/java/com/alibaba/datax/core/transport/record/TerminateRecord.java b/core/src/main/java/com/alibaba/datax/core/transport/record/TerminateRecord.java index 928609abda..c8a5635b5a 100755 --- a/core/src/main/java/com/alibaba/datax/core/transport/record/TerminateRecord.java +++ b/core/src/main/java/com/alibaba/datax/core/transport/record/TerminateRecord.java @@ -3,6 +3,8 @@ import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; +import java.util.Map; + /** * 作为标示 生产者已经完成生产的标志 * @@ -13,6 +15,11 @@ public class TerminateRecord implements Record { private TerminateRecord() { } + @Override + public void setCol2Index(Map mapper) { + + } + public static TerminateRecord get() { return SINGLE; } @@ -21,6 +28,21 @@ public static TerminateRecord get() { public void addColumn(Column column) { } + @Override + public void setString(String field, String val) { + + } + + @Override + public void setColumn(String field, Column column) { + + } + + @Override + public Column getColumn(String field) { + return null; + } + @Override public Column getColumn(int i) { return null; diff --git a/core/src/main/java/com/alibaba/datax/core/util/TISComplexTransformer.java b/core/src/main/java/com/alibaba/datax/core/util/TISComplexTransformer.java new file mode 100644 index 0000000000..81c255be2d --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/TISComplexTransformer.java @@ -0,0 +1,25 @@ +package com.alibaba.datax.core.util; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.transformer.ComplexTransformer; +import com.qlangtech.tis.plugin.datax.transformer.UDFDefinition; + +import java.util.Map; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-06-13 18:39 + **/ +public class TISComplexTransformer extends ComplexTransformer { + private final UDFDefinition tisUDF; + + public TISComplexTransformer(UDFDefinition tisUDF) { + this.tisUDF = tisUDF; + } + + @Override + public Record evaluate(Record record, Map tContext, Object... paras) { + tisUDF.evaluate(record); + return record; + } +} diff --git a/core/src/main/java/com/alibaba/datax/core/util/TransformerBuildInfo.java b/core/src/main/java/com/alibaba/datax/core/util/TransformerBuildInfo.java new file mode 100644 index 0000000000..bcb75a7fbf --- /dev/null +++ b/core/src/main/java/com/alibaba/datax/core/util/TransformerBuildInfo.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.core.util; + +import com.alibaba.datax.core.job.ITransformerBuildInfo; +import com.alibaba.datax.core.transport.transformer.TransformerExecution; + +import java.util.List; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-06-15 10:11 + **/ +public interface TransformerBuildInfo extends ITransformerBuildInfo { + + List getExecutions(); + + +} diff --git a/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java b/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java deleted file mode 100644 index 1b46962341..0000000000 --- a/core/src/main/java/com/alibaba/datax/core/util/TransformerUtil.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.alibaba.datax.core.util; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.core.transport.transformer.*; -import com.alibaba.datax.core.util.container.CoreConstant; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * no comments. - * Created by liqiang on 16/3/9. - */ -public class TransformerUtil { - - private static final Logger LOG = LoggerFactory.getLogger(TransformerUtil.class); - - public static List buildTransformerInfo(Configuration taskConfig) { - List tfConfigs = taskConfig.getListConfiguration(CoreConstant.JOB_TRANSFORMER); - if (tfConfigs == null || tfConfigs.size() == 0) { - return null; - } - - List result = new ArrayList(); - - - List functionNames = new ArrayList(); - - - for (Configuration configuration : tfConfigs) { - String functionName = configuration.getString("name"); - if (StringUtils.isEmpty(functionName)) { - throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_CONFIGURATION_ERROR, "config=" + configuration.toJSON()); - } - - if (functionName.equals("dx_groovy") && functionNames.contains("dx_groovy")) { - throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_CONFIGURATION_ERROR, "dx_groovy can be invoke once only."); - } - functionNames.add(functionName); - } - - /** - * 延迟load 第三方插件的function,并按需load - */ - LOG.info(String.format(" user config tranformers [%s], loading...", functionNames)); - TransformerRegistry.loadTransformerFromLocalStorage(functionNames); - - int i = 0; - - for (Configuration configuration : tfConfigs) { - String functionName = configuration.getString("name"); - TransformerInfo transformerInfo = TransformerRegistry.getTransformer(functionName); - if (transformerInfo == null) { - throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NOTFOUND_ERROR, "name=" + functionName); - } - - /** - * 具体的UDF对应一个paras - */ - TransformerExecutionParas transformerExecutionParas = new TransformerExecutionParas(); - /** - * groovy function仅仅只有code - */ - if (!functionName.equals("dx_groovy") && !functionName.equals("dx_fackGroovy")) { - Integer columnIndex = configuration.getInt(CoreConstant.TRANSFORMER_PARAMETER_COLUMNINDEX); - - if (columnIndex == null) { - throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "columnIndex must be set by UDF:name=" + functionName); - } - - transformerExecutionParas.setColumnIndex(columnIndex); - List paras = configuration.getList(CoreConstant.TRANSFORMER_PARAMETER_PARAS, String.class); - if (paras != null && paras.size() > 0) { - transformerExecutionParas.setParas(paras.toArray(new String[0])); - } - } else { - String code = configuration.getString(CoreConstant.TRANSFORMER_PARAMETER_CODE); - if (StringUtils.isEmpty(code)) { - throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "groovy code must be set by UDF:name=" + functionName); - } - transformerExecutionParas.setCode(code); - - List extraPackage = configuration.getList(CoreConstant.TRANSFORMER_PARAMETER_EXTRAPACKAGE, String.class); - if (extraPackage != null && extraPackage.size() > 0) { - transformerExecutionParas.setExtraPackage(extraPackage); - } - } - transformerExecutionParas.settContext(configuration.getMap(CoreConstant.TRANSFORMER_PARAMETER_CONTEXT)); - - TransformerExecution transformerExecution = new TransformerExecution(transformerInfo, transformerExecutionParas); - - transformerExecution.genFinalParas(); - result.add(transformerExecution); - i++; - LOG.info(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s" - , i, transformerInfo.getTransformer().getTransformerName() - , transformerInfo.isNative(), configuration.getConfiguration("parameter"))); - } - - return result; - - } -} diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java index 9bfc3a42af..29453901d2 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/CoreConstant.java @@ -7,91 +7,91 @@ /** * Created by jingxing on 14-8-25. */ -public class CoreConstant { - // --------------------------- 全局使用的变量(最好按照逻辑顺序,调整下成员变量顺序) - // -------------------------------- +public class CoreConstant implements TransformerConstant { + // --------------------------- 全局使用的变量(最好按照逻辑顺序,调整下成员变量顺序) + // -------------------------------- - public static final String DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL = "core.container.taskGroup.channel"; + public static final String DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL = "core.container.taskGroup.channel"; - public static final String DATAX_NAME = "core.container.job.content[0].dataxName"; + public static final String DATAX_NAME = "core.container.job.content[0].dataxName"; - public static final String DATAX_CORE_CONTAINER_MODEL = "core.container.model"; + public static final String DATAX_CORE_CONTAINER_MODEL = "core.container.model"; - public static final String DATAX_CORE_CONTAINER_JOB_ID = "core.container.job.id"; + public static final String DATAX_CORE_CONTAINER_JOB_ID = "core.container.job.id"; - public static final String DATAX_CORE_CONTAINER_TRACE_ENABLE = "core.container.trace.enable"; + public static final String DATAX_CORE_CONTAINER_TRACE_ENABLE = "core.container.trace.enable"; - public static final String DATAX_CORE_CONTAINER_JOB_MODE = "core.container.job.mode"; + public static final String DATAX_CORE_CONTAINER_JOB_MODE = "core.container.job.mode"; - public static final String DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL = "core.container.job.reportInterval"; + public static final String DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL = "core.container.job.reportInterval"; - public static final String DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL = "core.container.job.sleepInterval"; + public static final String DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL = "core.container.job.sleepInterval"; public static final String DATAX_CORE_CONTAINER_TASKGROUP_ID = "core.container.taskGroup.id"; - public static final String DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL = "core.container.taskGroup.sleepInterval"; + public static final String DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL = "core.container.taskGroup.sleepInterval"; - public static final String DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL = "core.container.taskGroup.reportInterval"; + public static final String DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL = "core.container.taskGroup.reportInterval"; - public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES = "core.container.task.failOver.maxRetryTimes"; + public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES = "core.container.task.failOver.maxRetryTimes"; - public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC = "core.container.task.failOver.retryIntervalInMsec"; + public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC = "core.container.task.failOver.retryIntervalInMsec"; - public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC = "core.container.task.failOver.maxWaitInMsec"; + public static final String DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC = "core.container.task.failOver.maxWaitInMsec"; public static final String DATAX_CORE_DATAXSERVER_ADDRESS = "core.dataXServer.address"; - public static final String DATAX_CORE_DSC_ADDRESS = "core.dsc.address"; + public static final String DATAX_CORE_DSC_ADDRESS = "core.dsc.address"; public static final String DATAX_CORE_DATAXSERVER_TIMEOUT = "core.dataXServer.timeout"; - public static final String DATAX_CORE_REPORT_DATAX_LOG = "core.dataXServer.reportDataxLog"; + public static final String DATAX_CORE_REPORT_DATAX_LOG = "core.dataXServer.reportDataxLog"; - public static final String DATAX_CORE_REPORT_DATAX_PERFLOG = "core.dataXServer.reportPerfLog"; + public static final String DATAX_CORE_REPORT_DATAX_PERFLOG = "core.dataXServer.reportPerfLog"; public static final String DATAX_CORE_TRANSPORT_CHANNEL_CLASS = "core.transport.channel.class"; - public static final String DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY = "core.transport.channel.capacity"; + public static final String DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY = "core.transport.channel.capacity"; - public static final String DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE = "core.transport.channel.byteCapacity"; + public static final String DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE = "core.transport.channel.byteCapacity"; - public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE = "core.transport.channel.speed.byte"; + public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE = "core.transport.channel.speed.byte"; public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD = "core.transport.channel.speed.record"; - public static final String DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL = "core.transport.channel.flowControlInterval"; + public static final String DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL = "core.transport.channel.flowControlInterval"; - public static final String DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE = "core.transport.exchanger.bufferSize"; + public static final String DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE = "core.transport.exchanger.bufferSize"; public static final String DATAX_CORE_TRANSPORT_RECORD_CLASS = "core.transport.record.class"; - public static final String DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS = "core.statistics.collector.plugin.taskClass"; + public static final String DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS = "core.statistics.collector.plugin.taskClass"; - public static final String DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_MAXDIRTYNUM = "core.statistics.collector.plugin.maxDirtyNumber"; + public static final String DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_MAXDIRTYNUM = "core.statistics.collector.plugin.maxDirtyNumber"; - public static final String DATAX_JOB_CONTENT_READER_NAME = "job.content[0].reader.name"; + public static final String DATAX_JOB_CONTENT_READER_NAME = "job.content[0].reader.name"; - public static final String DATAX_JOB_CONTENT_READER_PARAMETER = "job.content[0].reader.parameter"; + public static final String DATAX_JOB_CONTENT_READER_PARAMETER = "job.content[0].reader.parameter"; - public static final String DATAX_JOB_CONTENT_WRITER_NAME = "job.content[0].writer.name"; + public static final String DATAX_JOB_CONTENT_WRITER_NAME = "job.content[0].writer.name"; - public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER = "job.content[0].writer.parameter"; + public static final String DATAX_JOB_CONTENT_WRITER_PARAMETER = "job.content[0].writer.parameter"; - public static final String DATAX_JOB_JOBINFO = "job.jobInfo"; + public static final String DATAX_JOB_JOBINFO = "job.jobInfo"; - public static final String DATAX_JOB_CONTENT = "job.content"; + public static final String DATAX_JOB_CONTENT = "job.content"; - public static final String DATAX_JOB_CONTENT_TRANSFORMER = "job.content[0].transformer"; + public static final String DATAX_JOB_CONTENT_TRANSFORMER = "job.content[0]." + JOB_TRANSFORMER; public static final String DATAX_JOB_SETTING_KEYVERSION = "job.setting.keyVersion"; - public static final String DATAX_JOB_SETTING_SPEED_BYTE = "job.setting.speed.byte"; + public static final String DATAX_JOB_SETTING_SPEED_BYTE = "job.setting.speed.byte"; public static final String DATAX_JOB_SETTING_SPEED_RECORD = "job.setting.speed.record"; - public static final String DATAX_JOB_SETTING_SPEED_CHANNEL = "job.setting.speed.channel"; + public static final String DATAX_JOB_SETTING_SPEED_CHANNEL = "job.setting.speed.channel"; - public static final String DATAX_JOB_SETTING_ERRORLIMIT = "job.setting.errorLimit"; + public static final String DATAX_JOB_SETTING_ERRORLIMIT = "job.setting.errorLimit"; public static final String DATAX_JOB_SETTING_ERRORLIMIT_RECORD = "job.setting.errorLimit.record"; @@ -109,23 +109,22 @@ public class CoreConstant { // ----------------------------- 局部使用的变量 public static final String JOB_WRITER = "reader"; - public static final String JOB_READER = "reader"; + public static final String JOB_READER = "reader"; - public static final String JOB_TRANSFORMER = "transformer"; - public static final String JOB_READER_NAME = "reader.name"; + public static final String JOB_READER_NAME = "reader.name"; - public static final String JOB_READER_PARAMETER = "reader.parameter"; + public static final String JOB_READER_PARAMETER = "reader.parameter"; - public static final String JOB_WRITER_NAME = "writer.name"; + public static final String JOB_WRITER_NAME = "writer.name"; - public static final String JOB_WRITER_PARAMETER = "writer.parameter"; + public static final String JOB_WRITER_PARAMETER = "writer.parameter"; - public static final String TRANSFORMER_PARAMETER_COLUMNINDEX = "parameter.columnIndex"; - public static final String TRANSFORMER_PARAMETER_PARAS = "parameter.paras"; - public static final String TRANSFORMER_PARAMETER_CONTEXT = "parameter.context"; - public static final String TRANSFORMER_PARAMETER_CODE = "parameter.code"; - public static final String TRANSFORMER_PARAMETER_EXTRAPACKAGE = "parameter.extraPackage"; + public static final String TRANSFORMER_PARAMETER_COLUMNINDEX = "parameter.columnIndex"; + public static final String TRANSFORMER_PARAMETER_PARAS = "parameter.paras"; + public static final String TRANSFORMER_PARAMETER_CONTEXT = "parameter.context"; + public static final String TRANSFORMER_PARAMETER_CODE = "parameter.code"; + public static final String TRANSFORMER_PARAMETER_EXTRAPACKAGE = "parameter.extraPackage"; public static final String TASK_ID = "taskId"; @@ -137,9 +136,9 @@ public class CoreConstant { public static final String LAST_PRIVATEKEY = "last.privateKey"; - public static final String LAST_SERVICE_USERNAME = "last.service.username"; + public static final String LAST_SERVICE_USERNAME = "last.service.username"; - public static final String LAST_SERVICE_PASSWORD = "last.service.password"; + public static final String LAST_SERVICE_PASSWORD = "last.service.password"; public static final String CURRENT_KEYVERSION = "current.keyVersion"; @@ -147,45 +146,45 @@ public class CoreConstant { public static final String CURRENT_PRIVATEKEY = "current.privateKey"; - public static final String CURRENT_SERVICE_USERNAME = "current.service.username"; + public static final String CURRENT_SERVICE_USERNAME = "current.service.username"; - public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; + public static final String CURRENT_SERVICE_PASSWORD = "current.service.password"; - // ----------------------------- 环境变量 --------------------------------- + // ----------------------------- 环境变量 --------------------------------- - public static String DATAX_HOME = System.getProperty("datax.home"); + public static String DATAX_HOME = System.getProperty("datax.home"); - public static String DATAX_CONF_PATH = StringUtils.join(new String[] { - DATAX_HOME, "conf", "core.json" }, File.separator); + public static String DATAX_CONF_PATH = StringUtils.join(new String[]{ + DATAX_HOME, "conf", "core.json"}, File.separator); - public static String DATAX_CONF_LOG_PATH = StringUtils.join(new String[] { - DATAX_HOME, "conf", "logback.xml" }, File.separator); + public static String DATAX_CONF_LOG_PATH = StringUtils.join(new String[]{ + DATAX_HOME, "conf", "logback.xml"}, File.separator); - public static String DATAX_SECRET_PATH = StringUtils.join(new String[] { - DATAX_HOME, "conf", ".secret.properties" }, File.separator); + public static String DATAX_SECRET_PATH = StringUtils.join(new String[]{ + DATAX_HOME, "conf", ".secret.properties"}, File.separator); - public static String DATAX_PLUGIN_HOME = StringUtils.join(new String[] { - DATAX_HOME, "plugin" }, File.separator); + public static String DATAX_PLUGIN_HOME = StringUtils.join(new String[]{ + DATAX_HOME, "plugin"}, File.separator); - public static String DATAX_PLUGIN_READER_HOME = StringUtils.join( - new String[] { DATAX_HOME, "plugin", "reader" }, File.separator); + public static String DATAX_PLUGIN_READER_HOME = StringUtils.join( + new String[]{DATAX_HOME, "plugin", "reader"}, File.separator); - public static String DATAX_PLUGIN_WRITER_HOME = StringUtils.join( - new String[] { DATAX_HOME, "plugin", "writer" }, File.separator); + public static String DATAX_PLUGIN_WRITER_HOME = StringUtils.join( + new String[]{DATAX_HOME, "plugin", "writer"}, File.separator); - public static String DATAX_BIN_HOME = StringUtils.join(new String[] { - DATAX_HOME, "bin" }, File.separator); + public static String DATAX_BIN_HOME = StringUtils.join(new String[]{ + DATAX_HOME, "bin"}, File.separator); - public static String DATAX_JOB_HOME = StringUtils.join(new String[] { - DATAX_HOME, "job" }, File.separator); + public static String DATAX_JOB_HOME = StringUtils.join(new String[]{ + DATAX_HOME, "job"}, File.separator); - public static String DATAX_STORAGE_TRANSFORMER_HOME = StringUtils.join( - new String[] { DATAX_HOME, "local_storage", "transformer" }, File.separator); + public static String DATAX_STORAGE_TRANSFORMER_HOME = StringUtils.join( + new String[]{DATAX_HOME, "local_storage", "transformer"}, File.separator); - public static String DATAX_STORAGE_PLUGIN_READ_HOME = StringUtils.join( - new String[] { DATAX_HOME, "local_storage", "plugin","reader" }, File.separator); + public static String DATAX_STORAGE_PLUGIN_READ_HOME = StringUtils.join( + new String[]{DATAX_HOME, "local_storage", "plugin", "reader"}, File.separator); - public static String DATAX_STORAGE_PLUGIN_WRITER_HOME = StringUtils.join( - new String[] { DATAX_HOME, "local_storage", "plugin","writer" }, File.separator); + public static String DATAX_STORAGE_PLUGIN_WRITER_HOME = StringUtils.join( + new String[]{DATAX_HOME, "local_storage", "plugin", "writer"}, File.separator); } diff --git a/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReader.java b/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReader.java index 8c7e0028b6..a3c0a4861f 100755 --- a/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReader.java +++ b/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReader.java @@ -40,6 +40,7 @@ public void init() { @Override public List split(int adviceNumber) { + return DrdsReaderSplitUtil.doSplit(commonRdbmsReaderJob.dataSourceFactoryGetter, this.originalConfig, adviceNumber); } diff --git a/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReaderSplitUtil.java b/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReaderSplitUtil.java index 231ba00398..2e4a28292c 100755 --- a/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReaderSplitUtil.java +++ b/drdsreader/src/main/java/com/alibaba/datax/plugin/reader/drdsreader/DrdsReaderSplitUtil.java @@ -2,8 +2,10 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.job.IJobContainerContext; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.reader.util.QuerySql; import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; @@ -21,7 +23,7 @@ public class DrdsReaderSplitUtil { private static final Logger LOG = LoggerFactory .getLogger(DrdsReaderSplitUtil.class); - public static List doSplit(IDataSourceFactoryGetter dataBaseType, Configuration originalSliceConfig, + public static List doSplit(IJobContainerContext containerContext, IDataSourceFactoryGetter dataBaseType, Configuration originalSliceConfig, int adviceNumber) { boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue(); int tableNumber = originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK); @@ -37,13 +39,14 @@ public static List doSplit(IDataSourceFactoryGetter dataBaseType, originalSliceConfig.set(Key.JDBC_URL, DataBaseType.DRDS.appendJDBCSuffixForReader(jdbcUrl)); originalSliceConfig.remove(Constant.CONN_MARK); - return doDrdsReaderSplit(dataBaseType, originalSliceConfig); + return doDrdsReaderSplit(containerContext, dataBaseType, originalSliceConfig); } else { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "您的配置信息中的表(table)的配置有误. 因为Drdsreader 只需要读取一张逻辑表,后台会通过DRDS Proxy自动获取实际对应物理表的数据. 请检查您的配置并作出修改."); } } - private static List doDrdsReaderSplit(IDataSourceFactoryGetter dataBaseType, Configuration originalSliceConfig) { + private static List doDrdsReaderSplit(IJobContainerContext containerContext // + , IDataSourceFactoryGetter dataBaseType, Configuration originalSliceConfig) { List splittedConfigurations = new ArrayList(); Map> topology = getTopology(dataBaseType, originalSliceConfig); @@ -52,11 +55,11 @@ private static List doDrdsReaderSplit(IDataSourceFactoryGetter da "获取 drds 表拓扑结构失败, 拓扑结构不能为空."); } else { String table = originalSliceConfig.getString(Key.TABLE).trim(); - String column = originalSliceConfig.getString(Key.COLUMN).trim(); + String column = originalSliceConfig.getString(Key.COLUMN); + List cols = originalSliceConfig.getList(Key.COLUMN_LIST, String.class); String where = originalSliceConfig.getString(Key.WHERE, null); // 不能带英语分号结尾 - String sql = SingleTableSplitUtil - .buildQuerySql(column, table, where); + QuerySql sql = SingleTableSplitUtil.buildQuerySql(containerContext, column, cols, table, where); // 根据拓扑拆分任务 for (Map.Entry> entry : topology.entrySet()) { String group = entry.getKey(); @@ -74,7 +77,7 @@ private static List doDrdsReaderSplit(IDataSourceFactoryGetter da } } sqlbuilder.append("]})*/"); - sqlbuilder.append(sql); + sqlbuilder.append(sql.getQuerySql()); Configuration param = originalSliceConfig.clone(); param.set(Key.QUERY_SQL, sqlbuilder.toString()); splittedConfigurations.add(param); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index e1aa664eef..83ae69acb6 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -10,6 +10,7 @@ import com.alibaba.datax.core.job.IJobContainerContext; import com.alibaba.datax.plugin.rdbms.reader.util.OriginalConfPretreatmentUtil; import com.alibaba.datax.plugin.rdbms.reader.util.PreCheckTask; +import com.alibaba.datax.plugin.rdbms.reader.util.QuerySql; import com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil; import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil; @@ -69,7 +70,7 @@ public void init(Configuration originalConfig) { public void preCheck(Configuration originalConfig, DataBaseType dataBaseType) { /*检查每个表是否有读权限,以及querySql跟splik Key是否正确*/ - Configuration queryConf = ReaderSplitUtil.doPreCheckSplit(originalConfig); + Configuration queryConf = ReaderSplitUtil.doPreCheckSplit(this.containerContext, originalConfig); String splitPK = queryConf.getString(Key.SPLIT_PK); List connList = queryConf.getList(Constant.CONN_MARK, Object.class); String username = queryConf.getString(Key.USERNAME); @@ -110,7 +111,8 @@ public void preCheck(Configuration originalConfig, DataBaseType dataBaseType) { public List split(Configuration originalConfig, int adviceNumber) { - return ReaderSplitUtil.doSplit(this.dataSourceFactoryGetter, originalConfig, adviceNumber); + + return ReaderSplitUtil.doSplit(this.containerContext, originalConfig, adviceNumber); } public void post(Configuration originalConfig) { @@ -186,22 +188,25 @@ public void init(Configuration readerSliceConfig) { public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector //, int fetchSize this param seems as useless ) { - String querySql = readerSliceConfig.getString(Key.QUERY_SQL); + + final QuerySql query = QuerySql.from(readerSliceConfig); + +// String querySql = readerSliceConfig.getString(Key.QUERY_SQL); String table = readerSliceConfig.getString(Key.TABLE); if (StringUtils.isEmpty(table)) { - Matcher m = PATTERN_FROM_TABLE.matcher(querySql); + Matcher m = PATTERN_FROM_TABLE.matcher(query.getQuerySql()); if (m.find()) { table = readerDataSourceFactoryGetter.getDataSourceFactory().removeEscapeChar(m.group(1)); // table = StringUtils.remove(m.group(1), readerDataSourceFactoryGetter.getDataSourceFactory() // .getEscapeChar()); } else { - throw new IllegalStateException("can not find table name from query sql:" + querySql); + throw new IllegalStateException("can not find table name from query sql:" + query); } } PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg); - LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, basicMsg); + LOG.info("Begin to read record by Sql: [{}\n] {}.", query, basicMsg); PerfRecord queryPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY); queryPerfRecord.start(); @@ -232,7 +237,7 @@ public void startRead(Configuration readerSliceConfig, RecordSender recordSender if (rowFetchSize == null) { throw new IllegalStateException("param of DataXReader rowFetchSize can not be null"); } - statResult = DBUtil.query(conn, querySql, rowFetchSize, this.readerDataSourceFactoryGetter); + statResult = DBUtil.query(conn, query.getQuerySql(), rowFetchSize, this.readerDataSourceFactoryGetter); rs = statResult.getRight(); statement = statResult.getKey(); queryPerfRecord.end(); @@ -254,17 +259,17 @@ public void startRead(Configuration readerSliceConfig, RecordSender recordSender long lastTime = System.nanoTime(); while (rs.next()) { rsNextUsedTime += (System.nanoTime() - lastTime); - this.transportOneRecord(recordSender, rs, cols, columnNumber, mandatoryEncoding, + this.transportOneRecord(query, recordSender, rs, cols, columnNumber, mandatoryEncoding, taskPluginCollector); lastTime = System.nanoTime(); } allResultPerfRecord.end(rsNextUsedTime); //目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间 - LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, basicMsg); + LOG.info("Finished read record by Sql: [{}\n] {}.", query, basicMsg); } catch (Exception e) { - throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); + throw RdbmsException.asQueryException(this.dataBaseType, e, query.getQuerySql(), table, username); } finally { DBUtil.closeDBResources(statement, conn); } @@ -278,18 +283,18 @@ public void destroy(Configuration originalConfig) { // do nothing } - protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, List cols, + protected Record transportOneRecord(final QuerySql query, RecordSender recordSender, ResultSet rs, List cols, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) { - Record record = buildRecord(recordSender, rs, cols, columnNumber, mandatoryEncoding, taskPluginCollector); + Record record = buildRecord(query, recordSender, rs, cols, columnNumber, mandatoryEncoding, taskPluginCollector); recordSender.sendToWriter(record); return record; } - protected Record buildRecord(RecordSender recordSender, ResultSet rs, List cols, + protected Record buildRecord(final QuerySql query, RecordSender recordSender, ResultSet rs, List cols, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) { - Record record = recordSender.createRecord(); + Record record = recordSender.createRecord(query.getCol2Index()); ColumnMetaData cm = null; try { for (int i = 1; i <= columnNumber; i++) { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java index 9f2939c491..e50fb8fd74 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java @@ -30,6 +30,7 @@ public final class Key { public final static String SAMPLE_PERCENTAGE = "samplePercentage"; public final static String QUERY_SQL = "querySql"; + public final static String COLS_2_INDEX = "cols2Idx"; public final static String SPLIT_PK_SQL = "splitPkSql"; diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java index 5e03943079..e0b3b6a55d 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/OriginalConfPretreatmentUtil.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public final class OriginalConfPretreatmentUtil { private static final Logger LOG = LoggerFactory @@ -218,7 +219,10 @@ private static void dealColumnConf(IDataSourceFactoryGetter dataSourceFactoryGet // } } - originalConfig.set(Key.COLUMN_LIST, quotedColumns); + originalConfig.set(Key.COLUMN_LIST + , quotedColumns.stream().map((col)-> dataSourceFactoryGetter.getDBReservedKeys().removeEscapeChar(col)) + .collect(Collectors.toList())); + originalConfig.set(Key.COLUMN, StringUtils.join(quotedColumns, ",")); if (StringUtils.isNotBlank(splitPk)) { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/QuerySql.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/QuerySql.java new file mode 100644 index 0000000000..a1a148a28c --- /dev/null +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/QuerySql.java @@ -0,0 +1,84 @@ +package com.alibaba.datax.plugin.rdbms.reader.util; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.job.ITransformerBuildInfo; +import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-06-14 11:04 + **/ +public class QuerySql { + private final String querySql; + private final Map col2Index; + + public static QuerySql from(Configuration cfg) { + return new QuerySql(cfg.getString(Key.QUERY_SQL), cfg.getMap(Key.COLS_2_INDEX, Integer.class)); + } + + public QuerySql(String querySql, List cols, Optional transformerBuildCfg) { + this(querySql, toMap(cols, transformerBuildCfg)); + } + + private static Map toMap(List cols, Optional transformerBuildCfg) { + if (CollectionUtils.isEmpty(cols)) { + throw new IllegalArgumentException("param cols can not be empty"); + } + Builder mapBuilder = ImmutableMap.builder(); + Set added = Sets.newHashSet(); + String key = null; + int idx = 0; + for (; idx < cols.size(); idx++) { + key = cols.get(idx); + mapBuilder.put(key, idx); + added.add(key); + } + + if (transformerBuildCfg.isPresent()) { + for (String transformerOutterKey : transformerBuildCfg.get().relevantOutterColKeys()) { + if (!added.contains(transformerOutterKey)) { + mapBuilder.put(transformerOutterKey, idx++); + } + } + } + + return mapBuilder.build(); + } + + public QuerySql(String querySql, Map col2Index) { + if (StringUtils.isEmpty(querySql)) { + throw new IllegalArgumentException("param querySql can not be null"); + } + this.querySql = querySql; + this.col2Index = Objects.requireNonNull(col2Index, "param col2Idx can not be null"); + } + + public String getQuerySql() { + return this.querySql; + } + + public Map getCol2Index() { + return this.col2Index; + } + + public void write(Configuration cfg) { + cfg.set(Key.QUERY_SQL, this.getQuerySql()); + cfg.set(Key.COLS_2_INDEX, this.getCol2Index()); + } + + @Override + public String toString() { + return this.querySql; + } +} diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java index 3d8bf13601..3ee7200131 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java @@ -2,6 +2,7 @@ import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.job.IJobContainerContext; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; @@ -12,13 +13,14 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public final class ReaderSplitUtil { private static final Logger LOG = LoggerFactory .getLogger(ReaderSplitUtil.class); - public static List doSplit(IDataSourceFactoryGetter dataSourceFactoryGetter, + public static List doSplit(IJobContainerContext containerContext, Configuration originalSliceConfig, int adviceNumber) { boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue(); int eachTableShouldSplittedNumber = -1; @@ -30,6 +32,7 @@ public static List doSplit(IDataSourceFactoryGetter dataSourceFac } String column = originalSliceConfig.getString(Key.COLUMN); + List cols = originalSliceConfig.getList(Key.COLUMN_LIST, String.class); String where = originalSliceConfig.getString(Key.WHERE, null); List conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class); @@ -63,48 +66,53 @@ public static List doSplit(IDataSourceFactoryGetter dataSourceFac boolean needSplitTable = eachTableShouldSplittedNumber > 1 && StringUtils.isNotBlank(splitPk); if (needSplitTable) { - if (tables.size() == 1) { - //原来:如果是单表的,主键切分num=num*2+1 - // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑 - //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾 - - //考虑其他比率数字?(splitPk is null, 忽略此长尾) - //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; - - //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数 - // 最终task数为(channel/tableNum)向上取整*splitFactor - Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); - eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor; - } - // 尝试对每个表,切分为eachTableShouldSplittedNumber 份 - for (String table : tables) { - tempSlice = sliceConfig.clone(); - tempSlice.set(Key.TABLE, table); - - List splittedSlices = SingleTableSplitUtil - .splitSingleTable(dataSourceFactoryGetter, tempSlice, eachTableShouldSplittedNumber); - - splittedConfigs.addAll(splittedSlices); - } + throw new UnsupportedOperationException("split table is not supported"); +// if (tables.size() == 1) { +// //原来:如果是单表的,主键切分num=num*2+1 +// // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑 +// //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾 +// +// //考虑其他比率数字?(splitPk is null, 忽略此长尾) +// //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; +// +// //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数 +// // 最终task数为(channel/tableNum)向上取整*splitFactor +// Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); +// eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor; +// } +// // 尝试对每个表,切分为eachTableShouldSplittedNumber 份 +// for (String table : tables) { +// tempSlice = sliceConfig.clone(); +// tempSlice.set(Key.TABLE, table); +// +// List splittedSlices = SingleTableSplitUtil +// .splitSingleTable(dataSourceFactoryGetter, tempSlice, eachTableShouldSplittedNumber); +// +// splittedConfigs.addAll(splittedSlices); +// } } else { for (String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column); - tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); + QuerySql querySql = SingleTableSplitUtil.buildQuerySql(containerContext, queryColumn, cols, table, where); + + querySql.write(tempSlice); + splittedConfigs.add(tempSlice); } } } else { // 说明是配置的 querySql 方式 - List sqls = connConf.getList(Key.QUERY_SQL, String.class); - - // TODO 是否check 配置为多条语句?? - for (String querySql : sqls) { - tempSlice = sliceConfig.clone(); - tempSlice.set(Key.QUERY_SQL, querySql); - splittedConfigs.add(tempSlice); - } +// List sqls = connConf.getList(Key.QUERY_SQL, String.class); +// +// // TODO 是否check 配置为多条语句?? +// for (String querySql : sqls) { +// tempSlice = sliceConfig.clone(); +// tempSlice.set(Key.QUERY_SQL, querySql); +// splittedConfigs.add(tempSlice); +// } + throw new UnsupportedOperationException("norTableMode is not support"); } } @@ -112,19 +120,20 @@ public static List doSplit(IDataSourceFactoryGetter dataSourceFac return splittedConfigs; } - public static Configuration doPreCheckSplit(Configuration originalSliceConfig) { + public static Configuration doPreCheckSplit(IJobContainerContext containerContext,Configuration originalSliceConfig) { Configuration queryConfig = originalSliceConfig.clone(); boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue(); String splitPK = originalSliceConfig.getString(Key.SPLIT_PK); String column = originalSliceConfig.getString(Key.COLUMN); + List cols = originalSliceConfig.getList(Key.COLUMN_LIST, String.class); String where = originalSliceConfig.getString(Key.WHERE, null); List conns = queryConfig.getList(Constant.CONN_MARK, Object.class); for (int i = 0, len = conns.size(); i < len; i++) { Configuration connConf = Configuration.from(conns.get(i).toString()); - List querys = new ArrayList(); + List querys = new ArrayList<>(); List splitPkQuerys = new ArrayList(); String connPath = String.format("connection[%d]", i); // 说明是配置的 table 方式 @@ -133,7 +142,7 @@ public static Configuration doPreCheckSplit(Configuration originalSliceConfig) { List tables = connConf.getList(Key.TABLE, String.class); Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); for (String table : tables) { - querys.add(SingleTableSplitUtil.buildQuerySql(column, table, where)); + querys.add(SingleTableSplitUtil.buildQuerySql(containerContext,column, cols, table, where)); if (splitPK != null && !splitPK.isEmpty()) { splitPkQuerys.add(SingleTableSplitUtil.genPKSql(splitPK.trim(), table, where)); } @@ -141,17 +150,22 @@ public static Configuration doPreCheckSplit(Configuration originalSliceConfig) { if (!splitPkQuerys.isEmpty()) { connConf.set(Key.SPLIT_PK_SQL, splitPkQuerys); } - connConf.set(Key.QUERY_SQL, querys); + for (QuerySql querySql : querys) { + querySql.write(connConf); + +// connConf.set(Key.QUERY_SQL, Collections.singletonList(querySql.getQuerySql())); +// connConf.set(Key.COLS_2_INDEX, querySql.getCol2Index()); + } queryConfig.set(connPath, connConf); } else { // 说明是配置的 querySql 方式 - List sqls = connConf.getList(Key.QUERY_SQL, - String.class); - for (String querySql : sqls) { - querys.add(querySql); - } - connConf.set(Key.QUERY_SQL, querys); - queryConfig.set(connPath, connConf); +// List sqls = connConf.getList(Key.QUERY_SQL, String.class); +// for (String querySql : sqls) { +// querys.add(querySql); +// } +// connConf.set(Key.QUERY_SQL, querys); +// queryConfig.set(connPath, connConf); + throw new UnsupportedOperationException("norTableMode is not support"); } } return queryConfig; diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java index 97037cff2b..fa9739e31c 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java @@ -2,6 +2,8 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.job.IJobContainerContext; +import com.alibaba.datax.core.job.ITransformerBuildInfo; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; import com.alibaba.datax.plugin.rdbms.util.*; @@ -17,6 +19,7 @@ import java.sql.*; import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class SingleTableSplitUtil { private static final Logger LOG = LoggerFactory.getLogger(SingleTableSplitUtil.class); @@ -26,101 +29,103 @@ public class SingleTableSplitUtil { private SingleTableSplitUtil() { } - public static List splitSingleTable(IDataSourceFactoryGetter dataSourceFactoryGetter, - Configuration configuration, int adviceNum) { - List pluginParams = new ArrayList(); - List rangeList; - String splitPkName = configuration.getString(Key.SPLIT_PK); - String column = configuration.getString(Key.COLUMN); - String table = configuration.getString(Key.TABLE); - String where = configuration.getString(Key.WHERE, null); - boolean hasWhere = StringUtils.isNotBlank(where); - - //String splitMode = configuration.getString(Key.SPLIT_MODE, ""); - //if (Constant.SPLIT_MODE_RANDOMSAMPLE.equals(splitMode) && DATABASE_TYPE == DataBaseType.Oracle) { - if (DATABASE_TYPE == DataBaseType.Oracle) { - rangeList = genSplitSqlForOracle(dataSourceFactoryGetter, splitPkName, table, where, - configuration, adviceNum); - // warn: mysql etc to be added... - } else { - Pair minMaxPK = getPkRange(dataSourceFactoryGetter, configuration); - if (null == minMaxPK) { - throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK, - "根据切分主键切分表失败. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理."); - } - - configuration.set(Key.QUERY_SQL, buildQuerySql(column, table, where)); - if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) { - // 切分后获取到的start/end 有 Null 的情况 - pluginParams.add(configuration); - return pluginParams; - } - - boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration - .getString(Constant.PK_TYPE)); - boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration - .getString(Constant.PK_TYPE)); - - - if (isStringType) { - rangeList = RdbmsRangeSplitWrap.splitAndWrap( - String.valueOf(minMaxPK.getLeft()), - String.valueOf(minMaxPK.getRight()), adviceNum, - splitPkName, "'", DATABASE_TYPE); - } else if (isLongType) { - rangeList = RdbmsRangeSplitWrap.splitAndWrap( - new BigInteger(minMaxPK.getLeft().toString()), - new BigInteger(minMaxPK.getRight().toString()), - adviceNum, splitPkName); - } else { - throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK, - "您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理."); - } - } - String tempQuerySql; - List allQuerySql = new ArrayList(); - - if (null != rangeList && !rangeList.isEmpty()) { - for (String range : rangeList) { - Configuration tempConfig = configuration.clone(); - - tempQuerySql = buildQuerySql(column, table, where) - + (hasWhere ? " and " : " where ") + range; - - allQuerySql.add(tempQuerySql); - tempConfig.set(Key.QUERY_SQL, tempQuerySql); - pluginParams.add(tempConfig); - } - } else { - //pluginParams.add(configuration); // this is wrong for new & old split - Configuration tempConfig = configuration.clone(); - tempQuerySql = buildQuerySql(column, table, where) - + (hasWhere ? " and " : " where ") - + String.format(" %s IS NOT NULL", splitPkName); - allQuerySql.add(tempQuerySql); - tempConfig.set(Key.QUERY_SQL, tempQuerySql); - pluginParams.add(tempConfig); - } - - // deal pk is null - Configuration tempConfig = configuration.clone(); - tempQuerySql = buildQuerySql(column, table, where) - + (hasWhere ? " and " : " where ") - + String.format(" %s IS NULL", splitPkName); - - allQuerySql.add(tempQuerySql); - - LOG.info("After split(), allQuerySql=[\n{}\n].", - StringUtils.join(allQuerySql, "\n")); - - tempConfig.set(Key.QUERY_SQL, tempQuerySql); - pluginParams.add(tempConfig); - - return pluginParams; - } - - public static String buildQuerySql(String column, String table, - String where) { +// public static List splitSingleTable(IDataSourceFactoryGetter dataSourceFactoryGetter, +// Configuration configuration, int adviceNum) { +// List pluginParams = new ArrayList(); +// List rangeList; +// String splitPkName = configuration.getString(Key.SPLIT_PK); +// String column = configuration.getString(Key.COLUMN); +// +// List cols = configuration.getList(Key.COLUMN_LIST, String.class); +// String table = configuration.getString(Key.TABLE); +// String where = configuration.getString(Key.WHERE, null); +// boolean hasWhere = StringUtils.isNotBlank(where); +// +// //String splitMode = configuration.getString(Key.SPLIT_MODE, ""); +// //if (Constant.SPLIT_MODE_RANDOMSAMPLE.equals(splitMode) && DATABASE_TYPE == DataBaseType.Oracle) { +// if (DATABASE_TYPE == DataBaseType.Oracle) { +// rangeList = genSplitSqlForOracle(dataSourceFactoryGetter, splitPkName, table, where, +// configuration, adviceNum); +// // warn: mysql etc to be added... +// } else { +// Pair minMaxPK = getPkRange(dataSourceFactoryGetter, configuration); +// if (null == minMaxPK) { +// throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK, +// "根据切分主键切分表失败. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理."); +// } +// +// configuration.set(Key.QUERY_SQL, buildQuerySql(column, cols, table, where)); +// if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) { +// // 切分后获取到的start/end 有 Null 的情况 +// pluginParams.add(configuration); +// return pluginParams; +// } +// +// boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration +// .getString(Constant.PK_TYPE)); +// boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration +// .getString(Constant.PK_TYPE)); +// +// +// if (isStringType) { +// rangeList = RdbmsRangeSplitWrap.splitAndWrap( +// String.valueOf(minMaxPK.getLeft()), +// String.valueOf(minMaxPK.getRight()), adviceNum, +// splitPkName, "'", DATABASE_TYPE); +// } else if (isLongType) { +// rangeList = RdbmsRangeSplitWrap.splitAndWrap( +// new BigInteger(minMaxPK.getLeft().toString()), +// new BigInteger(minMaxPK.getRight().toString()), +// adviceNum, splitPkName); +// } else { +// throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK, +// "您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系 DBA 进行处理."); +// } +// } +// String tempQuerySql; +// List allQuerySql = new ArrayList(); +// +// if (null != rangeList && !rangeList.isEmpty()) { +// for (String range : rangeList) { +// Configuration tempConfig = configuration.clone(); +// +// tempQuerySql = buildQuerySql(column, cols, table, where).getQuerySql() +// + (hasWhere ? " and " : " where ") + range; +// +// allQuerySql.add(tempQuerySql); +// tempConfig.set(Key.QUERY_SQL, tempQuerySql); +// pluginParams.add(tempConfig); +// } +// } else { +// //pluginParams.add(configuration); // this is wrong for new & old split +// Configuration tempConfig = configuration.clone(); +// tempQuerySql = buildQuerySql(column, cols, table, where).getQuerySql() +// + (hasWhere ? " and " : " where ") +// + String.format(" %s IS NOT NULL", splitPkName); +// allQuerySql.add(tempQuerySql); +// tempConfig.set(Key.QUERY_SQL, tempQuerySql); +// pluginParams.add(tempConfig); +// } +// +// // deal pk is null +// Configuration tempConfig = configuration.clone(); +// tempQuerySql = buildQuerySql(column, cols, table, where).getQuerySql() +// + (hasWhere ? " and " : " where ") +// + String.format(" %s IS NULL", splitPkName); +// +// allQuerySql.add(tempQuerySql); +// +// LOG.info("After split(), allQuerySql=[\n{}\n].", +// StringUtils.join(allQuerySql, "\n")); +// +// tempConfig.set(Key.QUERY_SQL, tempQuerySql); +// pluginParams.add(tempConfig); +// +// return pluginParams; +// } + + public static QuerySql buildQuerySql(IJobContainerContext containerContext, String column, List cols, String table, + String where) { String querySql; if (StringUtils.isBlank(where)) { @@ -130,8 +135,8 @@ public static String buildQuerySql(String column, String table, querySql = String.format(Constant.QUERY_SQL_TEMPLATE, column, table, where); } - - return querySql; + Optional transformerBuildCfg = containerContext.getTransformerBuildCfg(); + return new QuerySql(querySql, cols, transformerBuildCfg); } @SuppressWarnings("resource") diff --git a/pom.xml b/pom.xml index e4901aa168..13b9ab3128 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 1.8 - 4.0.0 + 4.0.1 ${project.version} ${project.version} 3.3.2 diff --git a/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java b/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java index d7709134c7..67797c16c3 100755 --- a/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java +++ b/rdbmswriter/src/main/java/com/alibaba/datax/plugin/reader/rdbmswriter/SubCommonRdbmsWriter.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.reader.rdbmswriter; +import com.alibaba.datax.core.job.IJobContainerContext; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; diff --git a/transformer/src/main/java/com/alibaba/datax/transformer/ComplexTransformer.java b/transformer/src/main/java/com/alibaba/datax/transformer/ComplexTransformer.java index 2a820aeae5..9239cfbbfb 100644 --- a/transformer/src/main/java/com/alibaba/datax/transformer/ComplexTransformer.java +++ b/transformer/src/main/java/com/alibaba/datax/transformer/ComplexTransformer.java @@ -9,7 +9,7 @@ * Created by liqiang on 16/3/3. */ public abstract class ComplexTransformer { - //transformerName的唯一性在datax中检查,或者提交到插件中心检查。 + // transformerName的唯一性在datax中检查,或者提交到插件中心检查。 private String transformerName;