Skip to content

Commit

Permalink
add powerjob support for TIS
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Dec 19, 2023
1 parent 85b740c commit 80132c8
Show file tree
Hide file tree
Showing 15 changed files with 704 additions and 103 deletions.
7 changes: 4 additions & 3 deletions install.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
mvn clean install -Dmaven.test.skip=true \
-Ptis-repo \
-pl \
tis-datax-executor\
,hbase20xsqlreader\
hbase20xsqlreader\
,plugin-unstructured-storage-util\
,odpswriter\
,elasticsearchwriter\
Expand All @@ -27,6 +26,8 @@ tis-datax-executor\
,cassandrareader\
,cassandrawriter\
,mongodbreader\
,mongodbwriter \
,mongodbwriter\
,kingbaseesreader\
,kingbaseeswriter \
-am -fn

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<module>tis-datax-executor</module>
<!-- <module>tis-datax-executor</module>-->


</modules>
Expand Down
2 changes: 1 addition & 1 deletion tis-datax-executor/deploy-local-tis-datax-executor.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mvn clean package -Dappname=all
mvn clean install -Dappname=all
mkdir -p /opt/tis/tis-datax-executor
rm -rf /opt/tis/tis-datax-executor/*
tar xvf ../tis-datax-executor.tar.gz -C /opt/tis/
6 changes: 6 additions & 0 deletions tis-datax-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
<artifactId>tis-datax-executor</artifactId>
<dependencies>

<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Expand Down
1 change: 1 addition & 0 deletions tis-datax-executor/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
move to `tis-datax/executor/tis-datax-executor`
Original file line number Diff line number Diff line change
Expand Up @@ -9,113 +9,48 @@
* @author 百岁 ([email protected])
* @date 2023/11/18
*/
public class DataXLifecycleHookMsg implements IDataXTaskRelevant {
public class DataXLifecycleHookMsg extends JobHookMsg {

private IDataXBatchPost.LifeCycleHook lifeCycleHook;
private String tableName;
private StoreResourceType resType;
private String dataXName;
private String jobName;
private Integer taskId;
private Long execEpochMilli;

private DataXLifecycleHookMsg() {
public DataXLifecycleHookMsg() {
}

public static DataXLifecycleHookMsg createDataXLifecycleHookMsg(String dataXName, String tableName,

public static DataXLifecycleHookMsg createDataXLifecycleHookMsg(IDataxProcessor processor, String tableName,
Integer taskId, String jobName,
Long currentTimeStamp,
IDataXBatchPost.LifeCycleHook lifeCycleHook) {
if (StringUtils.isEmpty(dataXName)) {
throw new IllegalArgumentException("dataXName can not be null");
}
IDataXBatchPost.LifeCycleHook lifeCycleHook,
Boolean dryRun) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("tableName can not be null");
}
if (StringUtils.isEmpty(jobName)) {
throw new IllegalArgumentException("jobName can not be null");
}

DataXLifecycleHookMsg lifecycleHookMsg = new DataXLifecycleHookMsg();
lifecycleHookMsg.setTableName(tableName);
lifecycleHookMsg.setLifeCycleHook(lifeCycleHook);
lifecycleHookMsg.setResType(StoreResourceType.DataApp);
lifecycleHookMsg.setTaskId(Objects.requireNonNull(taskId, "taskId can not be null"));
lifecycleHookMsg.setDataXName(dataXName);
lifecycleHookMsg.setExecEpochMilli(Objects.requireNonNull(currentTimeStamp, "currentTimeStamp can not be " +
"null"));
lifecycleHookMsg.setJobName(jobName);
return lifecycleHookMsg;
}

private void setDataXName(String dataXName) {
this.dataXName = dataXName;
}

private void setJobName(String jobName) {
this.jobName = jobName;
}

private void setTaskId(Integer taskId) {
this.taskId = taskId;
}

private void setExecEpochMilli(Long execEpochMilli) {
this.execEpochMilli = execEpochMilli;
return createHookMsg(processor.identityValue(), processor.getResType(), taskId, jobName, currentTimeStamp, dryRun,
() -> {
DataXLifecycleHookMsg hookMsg = new DataXLifecycleHookMsg();
hookMsg.setTableName(tableName);
hookMsg.setLifeCycleHook(Objects.requireNonNull(lifeCycleHook, "param lifeCycleHook"));
return hookMsg;
});
}

public String getTableName() {
return this.tableName;
}

public StoreResourceType getResType() {
return resType;
}

private void setResType(StoreResourceType resType) {
this.resType = resType;
}

private void setTableName(String tableName) {
public void setTableName(String tableName) {
this.tableName = tableName;
}

public IDataXBatchPost.LifeCycleHook getLifeCycleHook() {
return Objects.requireNonNull(this.lifeCycleHook, "lifeCycleHook can not be null");
}

private void setLifeCycleHook(IDataXBatchPost.LifeCycleHook lifeCycleHook) {
public void setLifeCycleHook(IDataXBatchPost.LifeCycleHook lifeCycleHook) {
this.lifeCycleHook = lifeCycleHook;
}

@Override
public Integer getTaskId() {
return Objects.requireNonNull(this.taskId, "taskid can not be null");
}

@Override
public String getJobName() {
return this.jobName;
}

@Override
public String getDataXName() {
return this.dataXName;
}

@Override
public long getExecEpochMilli() {
// throw new UnsupportedOperationException();
return Objects.requireNonNull(this.execEpochMilli, "execEpochMilli can not be null");
}

@Override
public int getTaskSerializeNum() {
throw new UnsupportedOperationException();
}

@Override
public String getFormatTime(TimeFormat format) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
* @create: 2021-05-06 14:57
**/
public class DataxPrePostConsumer extends DataXJobSingleProcessorExecutor<DataXLifecycleHookMsg> {
// private static final Logger logger = LoggerFactory.getLogger(DataXJobConsumer.class);
// private final CuratorFramework curatorClient;
// private final ITISCoordinator coordinator;

private final DataXJobRunEnvironmentParamsSetter.ExtraJavaSystemPramsSuppiler extraJavaSystemPramsSuppiler;

Expand Down Expand Up @@ -92,15 +89,15 @@ protected void addMainClassParams(DataXLifecycleHookMsg msg, Integer taskId, Str
//
// StoreResourceType resType = StoreResourceType.parse(args[5]);
// final long execEpochMilli = Long.parseLong(args[6]);
if (StringUtils.isEmpty(msg.getTableName())) {
throw new IllegalArgumentException("param table name can not be empty");
}
// if (StringUtils.isEmpty(msg.getTableName())) {
// throw new IllegalArgumentException("param table name can not be empty");
// }
if (StringUtils.isEmpty(dataxName)) {
throw new IllegalArgumentException("param dataxName can not be empty");
}
if (StringUtils.isEmpty(msg.getTableName())) {
throw new IllegalArgumentException("param getTableName can not be empty");
}
// if (StringUtils.isEmpty(msg.getTableName())) {
// throw new IllegalArgumentException("param getTableName can not be empty");
// }
if (StringUtils.isEmpty(msg.getJobName())) {
throw new IllegalArgumentException("param getJobName can not be empty");
}
Expand Down Expand Up @@ -148,15 +145,21 @@ protected String getMainClassName() {
}

public File getWorkingDirectory() {
return getDataXExecutorDir();
}

public static File getDataXExecutorDir() {
File workDir = new File("/opt/tis/tis-datax-executor");
if (!workDir.exists()) {
throw new IllegalStateException("workDir is not exist:" + workDir.getAbsolutePath());
}
return workDir;
}

public static final String DEFAULT_CLASSPATH = "./lib/*:./tis-datax-executor.jar:./conf/";

public String getClasspath() {
return "./lib/*:./tis-datax-executor.jar:./conf/";
return DEFAULT_CLASSPATH;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ public static void main(String[] args) throws Exception {
IDataXBatchPost batchPost =
IDataxWriter.castBatchPost(Objects.requireNonNull(dataxProcessor.getWriter(null), "dataXName" +
":" + dataXName + " relevant dataXWriter can not be null"));

DefaultExecContext execContext = new DefaultExecContext(dataXName, execEpochMilli);
DefaultExecContext execContext = new DefaultExecContext(dataXName, execEpochMilli){
@Override
public IDataxProcessor getProcessor() {
return dataxProcessor;
}
};
execContext.setResType(resType);

if (IDataXBatchPost.KEY_POST.equalsIgnoreCase(lifecycleHookName)) {
hookTrigger = batchPost.createPostTask(execContext, tab, dataxProcessor.getDataxCfgFileNames(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Objects;

/**
* used by com.qlangtech.tis.plugin.datax.TaskExec
*
* @author 百岁 ([email protected])
* @date 2023/11/18
*/
Expand All @@ -32,10 +34,10 @@ protected final boolean useRuntimePropEnvProps() {
return false;
}

// @Override
// protected String[] getExtraJavaSystemPrams() {
// return new String[]{"-D" + CenterResource.KEY_notFetchFromCenterRepository + "=true"};
// }
// @Override
// protected String[] getExtraJavaSystemPrams() {
// return new String[]{"-D" + CenterResource.KEY_notFetchFromCenterRepository + "=true"};
// }

@Override
protected final String getIncrStateCollectAddress() {
Expand Down Expand Up @@ -69,5 +71,6 @@ protected void addMainClassParams(CuratorDataXTaskMessage msg, Integer taskId, S

cmdLine.addArgument(String.valueOf(msg.getTaskSerializeNum()));
cmdLine.addArgument(String.valueOf(msg.getExecEpochMilli()));
// cmdLine.addArgument(msg.)
}
}
Loading

0 comments on commit 80132c8

Please sign in to comment.