diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index e32f34c7e8..dc912f32f1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -29,6 +29,7 @@ import java.util.TimeZone; import com.google.common.collect.Lists; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; @@ -77,6 +78,8 @@ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow); updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput); cleanupWorkflow(workflow); + // Request for complete resource config refresh, when a workflow is explicitly marked as DELETED. + _clusterDataCache.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); return; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java index 05977c7c19..0b54764a6d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java @@ -29,6 +29,7 @@ import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -85,6 +86,36 @@ public void testDeleteWorkflow() throws InterruptedException { Assert.assertNull(_driver.getWorkflowContext(jobQueueName)); } + @Test + public void testDeleteWorkflowAndRecreate() throws Exception { + String workflowId_1 = TestHelper.getTestMethodName(); + String workflowId_2 = TestHelper.getTestMethodName() + "1"; + String workflowId_3 = TestHelper.getTestMethodName() + "2"; + + Workflow.Builder builder_1 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_1); + Workflow.Builder builder_2 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_2); + Workflow.Builder builder_3 = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(workflowId_3); + + _driver.start(builder_1.build()); + + TaskState polledState = _driver.pollForWorkflowState(workflowId_1, 2_000L, TaskState.COMPLETED, TaskState.FAILED); + Assert.assertEquals(TaskState.COMPLETED, polledState); + Assert.assertNotNull(_driver.getWorkflowConfig(workflowId_1)); + + // delete workflowId_1 and start other workflows + _driver.start(builder_2.build()); + _driver.deleteAndWaitForCompletion(workflowId_1, 2_000); + _driver.start(builder_3.build()); + Assert.assertNull(_driver.getWorkflowConfig(workflowId_1)); + + // re-create workflowId_1 + _driver.start(builder_1.build()); + TaskState recreatedPolledState = _driver.pollForWorkflowState(workflowId_1, 40_000L, TaskState.COMPLETED, TaskState.FAILED); + Assert.assertEquals(TaskState.COMPLETED, recreatedPolledState); + Assert.assertNotNull(_driver.getWorkflowConfig(workflowId_1)); + Assert.assertNotNull(_driver.getWorkflowContext(workflowId_1)); + } + @Test public void testDeleteWorkflowForcefully() throws InterruptedException { String jobQueueName = TestHelper.getTestMethodName();