Skip to content

Commit

Permalink
add transformer support for TIS
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jul 5, 2024
1 parent c8dc13e commit ba0a9bc
Show file tree
Hide file tree
Showing 27 changed files with 765 additions and 565 deletions.
6 changes: 6 additions & 0 deletions cassandrareader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-plugin</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,119 +5,137 @@
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;

import com.alibaba.datax.plugin.rdbms.reader.util.DataXCol2Index;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;

public class CassandraReader extends Reader {
private static final Logger LOG = LoggerFactory
.getLogger(CassandraReader.class);

public static class Job extends Reader.Job {

private Configuration jobConfig = null;
private Cluster cluster = null;

@Override public void init() {
this.jobConfig = super.getPluginJobConf();
this.jobConfig = super.getPluginJobConf();
String username = jobConfig.getString(Key.USERNAME);
String password = jobConfig.getString(Key.PASSWORD);
String hosts = jobConfig.getString(Key.HOST);
Integer port = jobConfig.getInt(Key.PORT,9042);
boolean useSSL = jobConfig.getBool(Key.USESSL);

if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
private static final Logger LOG = LoggerFactory
.getLogger(CassandraReader.class);

public static class Job extends Reader.Job {

private Configuration jobConfig = null;
private Cluster cluster = null;

@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.jobConfig = super.getPluginJobConf();
String username = jobConfig.getString(Key.USERNAME);
String password = jobConfig.getString(Key.PASSWORD);
String hosts = jobConfig.getString(Key.HOST);
Integer port = jobConfig.getInt(Key.PORT, 9042);
boolean useSSL = jobConfig.getBool(Key.USESSL);

if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
CassandraReaderHelper.checkConfig(jobConfig, cluster);
}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
CassandraReaderHelper.checkConfig(jobConfig,cluster);
}

@Override public void destroy() {
@Override
public void destroy() {

}
}

@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> splittedConfigs = CassandraReaderHelper.splitJob(adviceNumber, jobConfig, cluster);
return splittedConfigs;
}

@Override public List<Configuration> split(int adviceNumber) {
List<Configuration> splittedConfigs = CassandraReaderHelper.splitJob(adviceNumber,jobConfig,cluster);
return splittedConfigs;
}

}

public static class Task extends Reader.Task {
private Configuration taskConfig;
private Cluster cluster = null;
private Session session = null;
private String queryString = null;
private ConsistencyLevel consistencyLevel;
private int columnNumber = 0;
private List<String> columnMeta = null;

@Override public void init() {
this.taskConfig = super.getPluginJobConf();
String username = taskConfig.getString(Key.USERNAME);
String password = taskConfig.getString(Key.PASSWORD);
String hosts = taskConfig.getString(Key.HOST);
Integer port = taskConfig.getInt(Key.PORT);
boolean useSSL = taskConfig.getBool(Key.USESSL);
String keyspace = taskConfig.getString(Key.KEYSPACE);
this.columnMeta = taskConfig.getList(Key.COLUMN,String.class);
columnNumber = columnMeta.size();

if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
public static class Task extends Reader.Task {
private Configuration taskConfig;
private Cluster cluster = null;
private Session session = null;
private String queryString = null;
private ConsistencyLevel consistencyLevel;
private int columnNumber = 0;
private List<String> columnMeta = null;
private DataXCol2Index col2Index;

@Override
public void init() {
this.taskConfig = super.getPluginJobConf();
String username = taskConfig.getString(Key.USERNAME);
String password = taskConfig.getString(Key.PASSWORD);
String hosts = taskConfig.getString(Key.HOST);
Integer port = taskConfig.getInt(Key.PORT);
boolean useSSL = taskConfig.getBool(Key.USESSL);
String keyspace = taskConfig.getString(Key.KEYSPACE);
this.columnMeta = taskConfig.getList(Key.COLUMN, String.class);
columnNumber = columnMeta.size();

if ((username != null) && !username.isEmpty()) {
Cluster.Builder clusterBuilder = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts.split(","));
if (useSSL) {
clusterBuilder = clusterBuilder.withSSL();
}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
session = cluster.connect(keyspace);
String cl = taskConfig.getString(Key.CONSITANCY_LEVEL);
if (cl != null && !cl.isEmpty()) {
consistencyLevel = ConsistencyLevel.valueOf(cl);
} else {
consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}

queryString = CassandraReaderHelper.getQueryString(taskConfig, cluster);
LOG.info("query = " + queryString);

String table = taskConfig.getString(Key.TABLE);
IDataxReader dataxReader = this.loadDataXReader();
ISelectedTab selectedTab = dataxReader.getSelectedTab(table);

DataXCol2Index col2Index = DataXCol2Index.getCol2Index(this.containerContext.getTransformerBuildCfg(), selectedTab.getCols());

}
cluster = clusterBuilder.build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts.split(",")).build();
}
session = cluster.connect(keyspace);
String cl = taskConfig.getString(Key.CONSITANCY_LEVEL);
if( cl != null && !cl.isEmpty() ) {
consistencyLevel = ConsistencyLevel.valueOf(cl);
} else {
consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
}

queryString = CassandraReaderHelper.getQueryString(taskConfig,cluster);
LOG.info("query = " + queryString);

}
@Override
public void startRead(RecordSender recordSender) {
Objects.requireNonNull(this.col2Index, "col2Index can not be null");
ResultSet r = session.execute(new SimpleStatement(queryString).setConsistencyLevel(consistencyLevel));
for (Row row : r) {
Record record = recordSender.createRecord(col2Index);
record = CassandraReaderHelper.buildRecord(record, row, r.getColumnDefinitions(), columnNumber,
super.getTaskPluginCollector());
if (record != null)
recordSender.sendToWriter(record);
}
}

@Override public void startRead(RecordSender recordSender) {
ResultSet r = session.execute(new SimpleStatement(queryString).setConsistencyLevel(consistencyLevel));
for (Row row : r ) {
Record record = recordSender.createRecord();
record = CassandraReaderHelper.buildRecord(record,row,r.getColumnDefinitions(),columnNumber,
super.getTaskPluginCollector());
if( record != null )
recordSender.sendToWriter(record);
}
}
@Override
public void destroy() {

@Override public void destroy() {
}

}

}

}
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
<artifactId>commons-math3</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-plugin</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.alibaba.datax.common.plugin;

import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.impl.DataxProcessor;

/**
* Created by jingxing on 14-8-24.
*/
Expand All @@ -19,6 +23,11 @@ public void setTaskPluginCollector(
this.taskPluginCollector = taskPluginCollector;
}

protected final IDataxReader loadDataXReader() {
IDataxProcessor dataxProcessor = DataxProcessor.load(null, this.containerContext.getTISDataXName());
IDataxReader dataxReader = dataxProcessor.getReader(null);
return dataxReader;
}


public int getTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
package com.alibaba.datax.common.plugin;

import com.alibaba.datax.common.element.Record;

import java.util.Collections;
import java.util.Map;
import com.alibaba.datax.plugin.rdbms.reader.util.DataXCol2Index;

public interface RecordSender {

public default Record createRecord() {
return this.createRecord(Collections.emptyMap());
}
// public default Record createRecord() {
// return this.createRecord(Collections.emptyMap());
// }

public Record createRecord(Map<String, Integer> mapper);
public Record createRecord(DataXCol2Index mapper);

public void sendToWriter(Record record);

Expand Down
72 changes: 35 additions & 37 deletions common/src/main/java/com/alibaba/datax/common/spi/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,44 @@
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.plugin.RecordSender;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.impl.DataxProcessor;

/**
* 每个Reader插件在其内部内部实现Job、Task两个内部类。
*
*
* */
*/
public abstract class Reader extends BaseObject {

/**
* 每个Reader插件必须实现Job内部类。
*
* */
public static abstract class Job extends AbstractJobPlugin {

/**
* 切分任务
*
* @param adviceNumber
*
* 着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>=
* adviceNumber。<br>
* <br>
* 之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接,
* 并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息,
* DataX就可以同时启动100个Channel,这样给用户最好的吞吐量 <br>
* 例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分,
* 并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。 <br>
* <br>
* 当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。 <br>
* <br>
* 对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分,
* 这类情况只能按照源头物理信息切分规则切分。 <br>
* <br>
*
*
* */
public abstract List<Configuration> split(int adviceNumber);
}

public static abstract class Task extends AbstractTaskPlugin {
public abstract void startRead(RecordSender recordSender);
}
/**
* 每个Reader插件必须实现Job内部类。
*/
public static abstract class Job extends AbstractJobPlugin {

/**
* 切分任务
*
* @param adviceNumber 着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>=
* adviceNumber。<br>
* <br>
* 之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接,
* 并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息,
* DataX就可以同时启动100个Channel,这样给用户最好的吞吐量 <br>
* 例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分,
* 并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。 <br>
* <br>
* 当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。 <br>
* <br>
* 对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分,
* 这类情况只能按照源头物理信息切分规则切分。 <br>
* <br>
*/
public abstract List<Configuration> split(int adviceNumber);
}

public static abstract class Task extends AbstractTaskPlugin {


public abstract void startRead(RecordSender recordSender);
}
}
Loading

0 comments on commit ba0a9bc

Please sign in to comment.