Skip to content

Commit

Permalink
Refactor PipelineContainerComposer
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Mar 14, 2024
1 parent a51d5bc commit eb2de3e
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,21 @@ private void cleanUpPipelineJobs(final Connection connection, final PipelineJobT
Map<String, Object> jobInfo = queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId)).get(0);
String status = jobInfo.get("status").toString();
if (JobStatus.FINISHED.name().equals(status)) {
connection.createStatement().execute(String.format((jobType instanceof CDCJobType ? "DROP" : "COMMIT") + " %s '%s'", jobTypeName, jobId));
connection.createStatement().execute(String.format((isSupportCommit(jobType) ? "COMMIT" : "DROP") + " %s '%s'", jobTypeName, jobId));
} else {
connection.createStatement().execute(String.format((jobType instanceof CDCJobType ? "DROP" : "ROLLBACK") + " %s '%s'", jobTypeName, jobId));
connection.createStatement().execute(String.format((isSupportRollback(jobType) ? "ROLLBACK" : "DROP") + " %s '%s'", jobTypeName, jobId));
}
}
}

private boolean isSupportCommit(final PipelineJobType jobType) {
return !(jobType instanceof CDCJobType);
}

private boolean isSupportRollback(final PipelineJobType jobType) {
return !(jobType instanceof CDCJobType);
}

private List<Map<String, Object>> queryJobs(final Connection connection, final String jobTypeName) {
try {
return transformResultSetToList(connection.createStatement().executeQuery(String.format("SHOW %s LIST", jobTypeName)));
Expand Down

0 comments on commit eb2de3e

Please sign in to comment.