From a1fd7bc374858a0ce60bbc8bef1d271f18afdbda Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 23 Jan 2025 09:35:45 +0000 Subject: [PATCH] Fix trappy timeouts in persistent tasks requests (#120514) Ensure that callers constructing these master-node requests pass in an explicit timeout. Relates #107984 --- .../PersistentTaskCreationFailureIT.java | 2 +- ...PersistentTaskInitializationFailureIT.java | 2 +- .../PersistentTasksExecutorFullRestartIT.java | 2 +- .../persistent/PersistentTasksExecutorIT.java | 87 ++++++++++++++++--- .../decider/EnableAssignmentDeciderIT.java | 2 +- .../TransportPostFeatureUpgradeAction.java | 3 +- .../selection/HealthNodeTaskExecutor.java | 3 +- .../persistent/AllocatedPersistentTask.java | 10 ++- .../CompletionPersistentTaskAction.java | 5 +- .../PersistentTasksNodeService.java | 5 +- .../persistent/PersistentTasksService.java | 73 +++++++--------- .../RemovePersistentTaskAction.java | 5 +- .../persistent/StartPersistentTaskAction.java | 5 +- .../UpdatePersistentTaskStatusAction.java | 5 +- .../HealthNodeTaskExecutorTests.java | 5 +- .../CancelPersistentTaskRequestTests.java | 2 +- .../CompletionPersistentTaskRequestTests.java | 4 +- .../PersistentTasksNodeServiceTests.java | 14 +-- .../StartPersistentActionRequestTests.java | 2 +- .../UpdatePersistentTaskRequestTests.java | 2 +- .../ccr/action/ShardFollowTaskCleaner.java | 2 + .../downsample/TransportDownsampleAction.java | 2 +- .../ReindexDataStreamTransportAction.java | 3 +- .../integration/MlDistributedFailureIT.java | 1 + .../action/TransportPutRollupJobAction.java | 2 +- .../action/PutJobStateMachineTests.java | 9 +- .../xpack/security/Security.java | 3 +- .../xpack/shutdown/NodeShutdownTasksIT.java | 18 ++-- .../TransportStopTransformActionTests.java | 18 +++- .../transforms/TransformTaskTests.java | 3 +- 30 files changed, 188 insertions(+), 111 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java index 8a4d1ceda784b..6452968f2467f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java @@ -113,7 +113,7 @@ public void onFailure(Exception e) { UUIDs.base64UUID(), FailingCreationPersistentTaskExecutor.TASK_NAME, new FailingCreationTaskParams(), - null, + TEST_REQUEST_TIMEOUT, l.map(ignored -> null) ) ); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskInitializationFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskInitializationFailureIT.java index 2f08969258591..6e739b12f4064 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskInitializationFailureIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskInitializationFailureIT.java @@ -50,7 +50,7 @@ public void testPersistentTasksThatFailDuringInitializationAreRemovedFromCluster UUIDs.base64UUID(), FailingInitializationPersistentTaskExecutor.TASK_NAME, new FailingInitializationTaskParams(), - null, + TEST_REQUEST_TIMEOUT, startPersistentTaskFuture ); startPersistentTaskFuture.actionGet(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java index b710a05ec3780..684f6b71d0efc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java @@ -45,7 +45,7 @@ public void testFullClusterRestart() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); taskIds[i] = UUIDs.base64UUID(); - service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), TEST_REQUEST_TIMEOUT, future); } for (int i = 0; i < numberOfTasks; i++) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 203ad831bd2e9..8af2111afd5ba 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -69,7 +69,13 @@ public static class WaitForPersistentTaskFuture> future = new PlainActionFuture<>(); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future + ); long allocationId = future.get().getAllocationId(); waitForTaskToStart(); TaskInfo firstRunningTask = clusterAdmin().prepareListTasks() @@ -100,7 +106,13 @@ public void testPersistentActionCompletion() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); String taskId = UUIDs.base64UUID(); - persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + persistentTasksService.sendStartRequest( + taskId, + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future + ); long allocationId = future.get().getAllocationId(); waitForTaskToStart(); TaskInfo firstRunningTask = clusterAdmin().prepareListTasks() @@ -119,7 +131,14 @@ public void testPersistentActionCompletion() throws Exception { logger.info("Simulating errant completion notification"); // try sending completion request with incorrect allocation id PlainActionFuture> failedCompletionNotificationFuture = new PlainActionFuture<>(); - persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, null, null, failedCompletionNotificationFuture); + persistentTasksService.sendCompletionRequest( + taskId, + Long.MAX_VALUE, + null, + null, + TEST_REQUEST_TIMEOUT, + failedCompletionNotificationFuture + ); assertFutureThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class); // Make sure that the task is still running assertThat( @@ -141,7 +160,13 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); TestParams testParams = new TestParams("Blah"); testParams.setExecutorNodeAttr("test"); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + testParams, + TEST_REQUEST_TIMEOUT, + future + ); String taskId = future.get().getId(); Settings nodeSettings = Settings.builder().put(nodeSettings(0, Settings.EMPTY)).put("node.attr.test_attr", "test").build(); @@ -165,7 +190,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { // Remove the persistent task PlainActionFuture> removeFuture = new PlainActionFuture<>(); - persistentTasksService.sendRemoveRequest(taskId, null, removeFuture); + persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture); assertEquals(removeFuture.get().getId(), taskId); } @@ -182,7 +207,13 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); TestParams testParams = new TestParams("Blah"); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + testParams, + TEST_REQUEST_TIMEOUT, + future + ); String taskId = future.get().getId(); assertThat(clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), empty()); @@ -197,14 +228,20 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception // Remove the persistent task PlainActionFuture> removeFuture = new PlainActionFuture<>(); - persistentTasksService.sendRemoveRequest(taskId, null, removeFuture); + persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture); assertEquals(removeFuture.get().getId(), taskId); } public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future + ); String taskId = future.get().getId(); waitForTaskToStart(); TaskInfo firstRunningTask = clusterAdmin().prepareListTasks() @@ -250,7 +287,7 @@ public void testPersistentActionStatusUpdate() throws Exception { assertFutureThrows(future1, IllegalStateException.class, "timed out after 10ms"); PlainActionFuture> failedUpdateFuture = new PlainActionFuture<>(); - persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), null, failedUpdateFuture); + persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), TEST_REQUEST_TIMEOUT, failedUpdateFuture); assertFutureThrows( failedUpdateFuture, ResourceNotFoundException.class, @@ -275,11 +312,23 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); String taskId = UUIDs.base64UUID(); - persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + persistentTasksService.sendStartRequest( + taskId, + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future + ); future.get(); PlainActionFuture> future2 = new PlainActionFuture<>(); - persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future2); + persistentTasksService.sendStartRequest( + taskId, + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future2 + ); assertFutureThrows(future2, ResourceAlreadyExistsException.class); waitForTaskToStart(); @@ -315,7 +364,13 @@ public void testUnassignRunningPersistentTask() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); TestParams testParams = new TestParams("Blah"); testParams.setExecutorNodeAttr("test"); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + testParams, + TEST_REQUEST_TIMEOUT, + future + ); PersistentTask task = future.get(); String taskId = task.getId(); @@ -366,7 +421,13 @@ public void testAbortLocally() throws Exception { persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1)); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future); + persistentTasksService.sendStartRequest( + UUIDs.base64UUID(), + TestPersistentTasksExecutor.NAME, + new TestParams("Blah"), + TEST_REQUEST_TIMEOUT, + future + ); String taskId = future.get().getId(); long allocationId = future.get().getAllocationId(); waitForTaskToStart(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index 7038dca992493..b75156763c9ed 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -52,7 +52,7 @@ public void testEnableAssignmentAfterRestart() throws Exception { "task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), - null, + TEST_REQUEST_TIMEOUT, ActionListener.running(latch::countDown) ); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java index 57ebe8ef626fd..ecf7bab6a21c4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportPostFeatureUpgradeAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.PersistentTasksService; @@ -95,7 +96,7 @@ protected void masterOperation( SYSTEM_INDEX_UPGRADE_TASK_NAME, SYSTEM_INDEX_UPGRADE_TASK_NAME, new SystemIndexMigrationTaskParams(), - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap(startedTask -> { listener.onResponse(new PostFeatureUpgradeResponse(true, featuresToMigrate, null, null)); }, ex -> { diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java index 5991bc248ba76..7c37a0ce5d927 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.FeatureService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -162,7 +163,7 @@ void startTask(ClusterChangedEvent event) { TASK_NAME, TASK_NAME, new HealthNodeTaskParams(), - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap(r -> logger.debug("Created the health node task"), e -> { if (e instanceof NodeClosedException) { logger.debug("Failed to create health node task because node is shutting down", e); diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index 7f00562758a8f..cda73d4fa0bca 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -65,7 +65,13 @@ public void updatePersistentTaskState( final PersistentTaskState state, final ActionListener> listener ) { - persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, null, listener); + persistentTasksService.sendUpdateStateRequest( + persistentTaskId, + allocationId, + state, + TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */, + listener + ); } public String getPersistentTaskId() { @@ -201,7 +207,7 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure, @Nullable St getAllocationId(), failure, localAbortReason, - null, + TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */, new ActionListener<>() { @Override public void onResponse(PersistentTasksCustomMetadata.PersistentTask persistentTask) { diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index 74761144742cc..be37af165b42f 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -56,8 +57,8 @@ public Request(StreamInput in) throws IOException { localAbortReason = in.readOptionalString(); } - public Request(String taskId, long allocationId, Exception exception, String localAbortReason) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, String taskId, long allocationId, Exception exception, String localAbortReason) { + super(masterNodeTimeout); this.taskId = taskId; this.exception = exception; this.allocationId = allocationId; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index ff6a0b9018704..16fdc82074e3c 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.tasks.Task; @@ -310,7 +311,7 @@ private void notifyMasterOfFailedTask( taskInProgress.getAllocationId(), originalException, null, - null, + TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */, new ActionListener<>() { @Override public void onResponse(PersistentTask persistentTask) { @@ -346,7 +347,7 @@ private void cancelTask(Long allocationId) { if (task.markAsCancelled()) { // Cancel the local task using the task manager String reason = "task has been removed, cancelling locally"; - persistentTasksService.sendCancelRequest(task.getId(), reason, null, new ActionListener<>() { + persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<>() { @Override public void onResponse(ListTasksResponse cancelTasksResponse) { logger.trace( diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 4e828a1280b12..b540a9160241e 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -27,6 +27,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Objects; import java.util.function.Predicate; /** @@ -57,16 +58,16 @@ public void sendStartRequest( final String taskId, final String taskName, final Params taskParams, - final @Nullable TimeValue timeout, + final TimeValue timeout, final ActionListener> listener ) { @SuppressWarnings("unchecked") final ActionListener> wrappedListener = listener.map(t -> (PersistentTask) t); - StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams); - if (timeout != null) { - request.masterNodeTimeout(timeout); - } - execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener); + execute( + new StartPersistentTaskAction.Request(Objects.requireNonNull(timeout), taskId, taskName, taskParams), + StartPersistentTaskAction.INSTANCE, + wrappedListener + ); } /** @@ -85,33 +86,27 @@ public void sendCompletionRequest( final @Nullable TimeValue timeout, final ActionListener> listener ) { - CompletionPersistentTaskAction.Request request = new CompletionPersistentTaskAction.Request( - taskId, - taskAllocationId, - taskFailure, - localAbortReason + execute( + new CompletionPersistentTaskAction.Request( + Objects.requireNonNull(timeout), + taskId, + taskAllocationId, + taskFailure, + localAbortReason + ), + CompletionPersistentTaskAction.INSTANCE, + listener ); - if (timeout != null) { - request.masterNodeTimeout(timeout); - } - execute(request, CompletionPersistentTaskAction.INSTANCE, listener); } /** * Cancels a locally running task using the Task Manager API. Accepts operation timeout as optional parameter */ - void sendCancelRequest( - final long taskId, - final String reason, - final @Nullable TimeValue timeout, - final ActionListener listener - ) { + void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { CancelTasksRequest request = new CancelTasksRequest(); request.setTargetTaskId(new TaskId(clusterService.localNode().getId(), taskId)); request.setReason(reason); - if (timeout != null) { - request.setTimeout(timeout); - } + // TODO set timeout? try { client.admin().cluster().cancelTasks(request, listener); } catch (Exception e) { @@ -130,33 +125,25 @@ void sendUpdateStateRequest( final String taskId, final long taskAllocationID, final PersistentTaskState taskState, - final @Nullable TimeValue timeout, + final TimeValue timeout, final ActionListener> listener ) { - UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request( - taskId, - taskAllocationID, - taskState + execute( + new UpdatePersistentTaskStatusAction.Request(Objects.requireNonNull(timeout), taskId, taskAllocationID, taskState), + UpdatePersistentTaskStatusAction.INSTANCE, + listener ); - if (timeout != null) { - request.masterNodeTimeout(timeout); - } - execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener); } /** * Notifies the master node to remove a persistent task from the cluster state. Accepts operation timeout as optional parameter */ - public void sendRemoveRequest( - final String taskId, - final @Nullable TimeValue timeout, - final ActionListener> listener - ) { - RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId); - if (timeout != null) { - request.masterNodeTimeout(timeout); - } - execute(request, RemovePersistentTaskAction.INSTANCE, listener); + public void sendRemoveRequest(final String taskId, final TimeValue timeout, final ActionListener> listener) { + execute( + new RemovePersistentTaskAction.Request(Objects.requireNonNull(timeout), taskId), + RemovePersistentTaskAction.INSTANCE, + listener + ); } /** diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 87d712ececd2d..5fa18a070b16e 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -44,8 +45,8 @@ public Request(StreamInput in) throws IOException { taskId = in.readString(); } - public Request(String taskId) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, String taskId) { + super(masterNodeTimeout); this.taskId = taskId; } diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index d79f271bbc328..91c2d41a4a807 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -52,8 +53,8 @@ public Request(StreamInput in) throws IOException { params = in.readNamedWriteable(PersistentTaskParams.class); } - public Request(String taskId, String taskName, PersistentTaskParams params) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, String taskId, String taskName, PersistentTaskParams params) { + super(masterNodeTimeout); this.taskId = taskId; this.taskName = taskName; this.params = params; diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index 346628f6224a8..b3692ecfdd559 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -49,8 +50,8 @@ public Request(StreamInput in) throws IOException { state = in.readOptionalNamedWriteable(PersistentTaskState.class); } - public Request(String taskId, long allocationId, PersistentTaskState state) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, String taskId, long allocationId, PersistentTaskState state) { + super(masterNodeTimeout); this.taskId = taskId; this.allocationId = allocationId; this.state = state; diff --git a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java index 3069589f9556c..aee02fb288b55 100644 --- a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java @@ -42,6 +42,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -102,7 +103,7 @@ public void testTaskCreation() throws Exception { eq("health-node"), eq("health-node"), eq(new HealthNodeTaskParams()), - eq(null), + isNotNull(), any() ) ); @@ -121,7 +122,7 @@ public void testSkippingTaskCreationIfItExists() { eq("health-node"), eq("health-node"), eq(new HealthNodeTaskParams()), - eq(null), + isNotNull(), any() ); } diff --git a/server/src/test/java/org/elasticsearch/persistent/CancelPersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/CancelPersistentTaskRequestTests.java index 11a4810f113fa..c21a2ccd66ef5 100644 --- a/server/src/test/java/org/elasticsearch/persistent/CancelPersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/CancelPersistentTaskRequestTests.java @@ -18,7 +18,7 @@ public class CancelPersistentTaskRequestTests extends AbstractWireSerializingTes @Override protected Request createTestInstance() { - return new Request(randomAsciiOfLength(10)); + return new Request(randomTimeValue(), randomAsciiOfLength(10)); } @Override diff --git a/server/src/test/java/org/elasticsearch/persistent/CompletionPersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/CompletionPersistentTaskRequestTests.java index b57f401ed56fe..cb61d3b3862d7 100644 --- a/server/src/test/java/org/elasticsearch/persistent/CompletionPersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/CompletionPersistentTaskRequestTests.java @@ -17,9 +17,9 @@ public class CompletionPersistentTaskRequestTests extends AbstractWireSerializin @Override protected Request createTestInstance() { if (randomBoolean()) { - return new Request(randomAlphaOfLength(10), randomNonNegativeLong(), null, null); + return new Request(randomTimeValue(), randomAlphaOfLength(10), randomNonNegativeLong(), null, null); } else { - return new Request(randomAlphaOfLength(10), randomNonNegativeLong(), null, randomAlphaOfLength(20)); + return new Request(randomTimeValue(), randomAlphaOfLength(10), randomNonNegativeLong(), null, randomAlphaOfLength(20)); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 7cf57325baa00..9d408a0d152dc 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -260,12 +260,7 @@ public void testTaskCancellation() { when(client.settings()).thenReturn(Settings.EMPTY); PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, client) { @Override - void sendCancelRequest( - final long taskId, - final String reason, - final TimeValue timeout, - final ActionListener listener - ) { + void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { capturedTaskId.set(taskId); capturedListener.set(listener); } @@ -356,12 +351,7 @@ public void testTaskLocalAbort() { when(client.settings()).thenReturn(Settings.EMPTY); PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, client) { @Override - void sendCancelRequest( - final long taskId, - final String reason, - final TimeValue timeout, - final ActionListener listener - ) { + void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { fail("Shouldn't be called during local abort"); } diff --git a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java index 07079f6c64df1..44434b6500ca9 100644 --- a/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/StartPersistentActionRequestTests.java @@ -30,7 +30,7 @@ protected Request createTestInstance() { if (randomBoolean()) { testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); } - return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams); + return new Request(randomTimeValue(), UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams); } @Override diff --git a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java index 3988b3879956b..61dc7f06dcbf6 100644 --- a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java @@ -22,7 +22,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractWireSerializingTes @Override protected Request createTestInstance() { - return new Request(UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10))); + return new Request(randomTimeValue(), UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10))); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java index 7a05a4e712fc4..fd3ed852650f3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskCleaner.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; @@ -110,6 +111,7 @@ public void onFailure(Exception e) { client.execute( CompletionPersistentTaskAction.INSTANCE, new CompletionPersistentTaskAction.Request( + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, persistentTask.getId(), persistentTask.getAllocationId(), new IndexNotFoundException(followerIndex), diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 796ff4e677aa9..7c26ad60fb13c 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -549,7 +549,7 @@ public void onFailure(Exception e) { persistentTaskId, DownsampleShardTask.TASK_NAME, params, - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap( startedTask -> persistentTasksService.waitForPersistentTaskCondition( startedTask.getId(), diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index 2d7c17db054a9..26301f1397a0e 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -81,7 +82,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList persistentTaskId, ReindexDataStreamTask.TASK_NAME, params, - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap(startedTask -> listener.onResponse(AcknowledgedResponse.TRUE), listener::onFailure) ); } diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index dfb960794537b..60dc4325fee17 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -308,6 +308,7 @@ public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() thro } UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest = new UpdatePersistentTaskStatusAction.Request( + TEST_REQUEST_TIMEOUT, task.getId(), task.getAllocationId(), DatafeedState.STOPPING diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index f3830d5cbf68c..d124d5014c7e1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -322,7 +322,7 @@ static void startPersistentTask( job.getConfig().getId(), RollupField.TASK_NAME, job, - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), e -> { if (e instanceof ResourceAlreadyExistsException) { e = new ElasticsearchStatusException( diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index fed2439e513c8..5868a762ed517 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -373,10 +374,10 @@ public void testTaskAlreadyExists() { requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex())); return null; }).when(tasksService) - .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), eq(null), requestCaptor.capture()); + .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture()); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), eq(null), any()); + verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -401,7 +402,7 @@ public void testStartTask() { requestCaptor.getValue().onResponse(response); return null; }).when(tasksService) - .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), eq(null), requestCaptor.capture()); + .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture()); ArgumentCaptor requestCaptor2 = ArgumentCaptor.forClass( PersistentTasksService.WaitForPersistentTaskListener.class @@ -413,7 +414,7 @@ public void testStartTask() { }).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), eq(null), any()); + verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any()); verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any()); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index fd530a338b26c..6004f8ebf95c4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -63,6 +63,7 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.features.FeatureService; @@ -1273,7 +1274,7 @@ private void submitPersistentMigrationTask(int migrationsVersion, boolean securi SecurityMigrationTaskParams.TASK_NAME, SecurityMigrationTaskParams.TASK_NAME, new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded), - null, + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, ActionListener.wrap((response) -> { logger.debug("Security migration task submitted"); }, (exception) -> { diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java index 46f568d286f9e..784f1c1fbe23e 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java @@ -183,12 +183,18 @@ protected void nodeOperation(AllocatedPersistentTask task, TestTaskParams params private void startTask() { logger.info("--> sending start request"); - persistentTasksService.sendStartRequest("task_id", "task_name", new TestTaskParams(), null, ActionListener.wrap(r -> {}, e -> { - if (e instanceof ResourceAlreadyExistsException == false) { - logger.error("failed to create task", e); - fail("failed to create task"); - } - })); + persistentTasksService.sendStartRequest( + "task_id", + "task_name", + new TestTaskParams(), + TEST_REQUEST_TIMEOUT, + ActionListener.wrap(r -> {}, e -> { + if (e instanceof ResourceAlreadyExistsException == false) { + logger.error("failed to create task", e); + fail("failed to create task"); + } + }) + ); } @Override diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java index 08e0982b2ab84..7c3b9f655b979 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java @@ -266,11 +266,23 @@ public void testCancelTransformTasksListener_OneTaskCouldNotBeRemoved() { when(client.threadPool()).thenReturn(threadPool); doAnswer(randomBoolean() ? withResponse() : withException(new ResourceNotFoundException("task not found"))).when(client) - .execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-A")), any()); + .execute( + same(RemovePersistentTaskAction.INSTANCE), + eq(new RemovePersistentTaskAction.Request(TEST_REQUEST_TIMEOUT, "task-A")), + any() + ); doAnswer(randomBoolean() ? withResponse() : withException(new ResourceNotFoundException("task not found"))).when(client) - .execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-B")), any()); + .execute( + same(RemovePersistentTaskAction.INSTANCE), + eq(new RemovePersistentTaskAction.Request(TEST_REQUEST_TIMEOUT, "task-B")), + any() + ); doAnswer(withException(new IllegalStateException("real issue while removing task"))).when(client) - .execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-C")), any()); + .execute( + same(RemovePersistentTaskAction.INSTANCE), + eq(new RemovePersistentTaskAction.Request(TEST_REQUEST_TIMEOUT, "task-C")), + any() + ); PersistentTasksService persistentTasksService = new PersistentTasksService(mock(ClusterService.class), threadPool, client); Set transformTasks = Set.of("task-A", "task-B", "task-C"); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index e381659b1e01c..535484ed3a196 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -80,6 +80,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -352,7 +353,7 @@ public void testFailWhenNodeIsShuttingDown() { eq(42L), isNull(), eq("Node is shutting down."), - isNull(), + isNotNull(), any() ); }