diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java index bf5e2944eb7ca..878338c5ee475 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java @@ -159,13 +159,21 @@ private void cleanUpPipelineJobs(final Connection connection, final PipelineJobT Map jobInfo = queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId)).get(0); String status = jobInfo.get("status").toString(); if (JobStatus.FINISHED.name().equals(status)) { - connection.createStatement().execute(String.format((jobType instanceof CDCJobType ? "DROP" : "COMMIT") + " %s '%s'", jobTypeName, jobId)); + connection.createStatement().execute(String.format((isSupportCommit(jobType) ? "COMMIT" : "DROP") + " %s '%s'", jobTypeName, jobId)); } else { - connection.createStatement().execute(String.format((jobType instanceof CDCJobType ? "DROP" : "ROLLBACK") + " %s '%s'", jobTypeName, jobId)); + connection.createStatement().execute(String.format((isSupportRollback(jobType) ? "ROLLBACK" : "DROP") + " %s '%s'", jobTypeName, jobId)); } } } + private boolean isSupportCommit(final PipelineJobType jobType) { + return !(jobType instanceof CDCJobType); + } + + private boolean isSupportRollback(final PipelineJobType jobType) { + return !(jobType instanceof CDCJobType); + } + private List> queryJobs(final Connection connection, final String jobTypeName) { try { return transformResultSetToList(connection.createStatement().executeQuery(String.format("SHOW %s LIST", jobTypeName)));