Skip to content

Commit

Permalink
Fix trappy timeouts in persistent tasks requests (elastic#120514)
Browse files Browse the repository at this point in the history
Ensure that callers constructing these master-node requests pass in an
explicit timeout.

Relates elastic#107984
  • Loading branch information
DaveCTurner authored Jan 23, 2025
1 parent 7563e71 commit a1fd7bc
Show file tree
Hide file tree
Showing 30 changed files with 188 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void onFailure(Exception e) {
UUIDs.base64UUID(),
FailingCreationPersistentTaskExecutor.TASK_NAME,
new FailingCreationTaskParams(),
null,
TEST_REQUEST_TIMEOUT,
l.map(ignored -> null)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testPersistentTasksThatFailDuringInitializationAreRemovedFromCluster
UUIDs.base64UUID(),
FailingInitializationPersistentTaskExecutor.TASK_NAME,
new FailingInitializationTaskParams(),
null,
TEST_REQUEST_TIMEOUT,
startPersistentTaskFuture
);
startPersistentTaskFuture.actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testFullClusterRestart() throws Exception {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
futures.add(future);
taskIds[i] = UUIDs.base64UUID();
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), TEST_REQUEST_TIMEOUT, future);
}

for (int i = 0; i < numberOfTasks; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ public static class WaitForPersistentTaskFuture<Params extends PersistentTaskPar
public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future
);
long allocationId = future.get().getAllocationId();
waitForTaskToStart();
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
Expand Down Expand Up @@ -100,7 +106,13 @@ public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
persistentTasksService.sendStartRequest(
taskId,
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future
);
long allocationId = future.get().getAllocationId();
waitForTaskToStart();
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
Expand All @@ -119,7 +131,14 @@ public void testPersistentActionCompletion() throws Exception {
logger.info("Simulating errant completion notification");
// try sending completion request with incorrect allocation id
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, null, null, failedCompletionNotificationFuture);
persistentTasksService.sendCompletionRequest(
taskId,
Long.MAX_VALUE,
null,
null,
TEST_REQUEST_TIMEOUT,
failedCompletionNotificationFuture
);
assertFutureThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
// Make sure that the task is still running
assertThat(
Expand All @@ -141,7 +160,13 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test");
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
testParams,
TEST_REQUEST_TIMEOUT,
future
);
String taskId = future.get().getId();

Settings nodeSettings = Settings.builder().put(nodeSettings(0, Settings.EMPTY)).put("node.attr.test_attr", "test").build();
Expand All @@ -165,7 +190,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {

// Remove the persistent task
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
persistentTasksService.sendRemoveRequest(taskId, null, removeFuture);
persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture);
assertEquals(removeFuture.get().getId(), taskId);
}

Expand All @@ -182,7 +207,13 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
testParams,
TEST_REQUEST_TIMEOUT,
future
);
String taskId = future.get().getId();

assertThat(clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), empty());
Expand All @@ -197,14 +228,20 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception

// Remove the persistent task
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
persistentTasksService.sendRemoveRequest(taskId, null, removeFuture);
persistentTasksService.sendRemoveRequest(taskId, TEST_REQUEST_TIMEOUT, removeFuture);
assertEquals(removeFuture.get().getId(), taskId);
}

public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future
);
String taskId = future.get().getId();
waitForTaskToStart();
TaskInfo firstRunningTask = clusterAdmin().prepareListTasks()
Expand Down Expand Up @@ -250,7 +287,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
assertFutureThrows(future1, IllegalStateException.class, "timed out after 10ms");

PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), null, failedUpdateFuture);
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), TEST_REQUEST_TIMEOUT, failedUpdateFuture);
assertFutureThrows(
failedUpdateFuture,
ResourceNotFoundException.class,
Expand All @@ -275,11 +312,23 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
persistentTasksService.sendStartRequest(
taskId,
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future
);
future.get();

PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future2);
persistentTasksService.sendStartRequest(
taskId,
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future2
);
assertFutureThrows(future2, ResourceAlreadyExistsException.class);

waitForTaskToStart();
Expand Down Expand Up @@ -315,7 +364,13 @@ public void testUnassignRunningPersistentTask() throws Exception {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test");
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
testParams,
TEST_REQUEST_TIMEOUT,
future
);
PersistentTask<TestParams> task = future.get();
String taskId = task.getId();

Expand Down Expand Up @@ -366,7 +421,13 @@ public void testAbortLocally() throws Exception {
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), null, future);
persistentTasksService.sendStartRequest(
UUIDs.base64UUID(),
TestPersistentTasksExecutor.NAME,
new TestParams("Blah"),
TEST_REQUEST_TIMEOUT,
future
);
String taskId = future.get().getId();
long allocationId = future.get().getAllocationId();
waitForTaskToStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testEnableAssignmentAfterRestart() throws Exception {
"task_" + i,
TestPersistentTasksExecutor.NAME,
new TestParams(randomAlphaOfLength(10)),
null,
TEST_REQUEST_TIMEOUT,
ActionListener.running(latch::countDown)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTasksService;
Expand Down Expand Up @@ -95,7 +96,7 @@ protected void masterOperation(
SYSTEM_INDEX_UPGRADE_TASK_NAME,
SYSTEM_INDEX_UPGRADE_TASK_NAME,
new SystemIndexMigrationTaskParams(),
null,
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap(startedTask -> {
listener.onResponse(new PostFeatureUpgradeResponse(true, featuresToMigrate, null, null));
}, ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand Down Expand Up @@ -162,7 +163,7 @@ void startTask(ClusterChangedEvent event) {
TASK_NAME,
TASK_NAME,
new HealthNodeTaskParams(),
null,
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap(r -> logger.debug("Created the health node task"), e -> {
if (e instanceof NodeClosedException) {
logger.debug("Failed to create health node task because node is shutting down", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ public void updatePersistentTaskState(
final PersistentTaskState state,
final ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener
) {
persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, null, listener);
persistentTasksService.sendUpdateStateRequest(
persistentTaskId,
allocationId,
state,
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
listener
);
}

public String getPersistentTaskId() {
Expand Down Expand Up @@ -201,7 +207,7 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure, @Nullable St
getAllocationId(),
failure,
localAbortReason,
null,
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -56,8 +57,8 @@ public Request(StreamInput in) throws IOException {
localAbortReason = in.readOptionalString();
}

public Request(String taskId, long allocationId, Exception exception, String localAbortReason) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, String taskId, long allocationId, Exception exception, String localAbortReason) {
super(masterNodeTimeout);
this.taskId = taskId;
this.exception = exception;
this.allocationId = allocationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -310,7 +311,7 @@ private <Params extends PersistentTaskParams> void notifyMasterOfFailedTask(
taskInProgress.getAllocationId(),
originalException,
null,
null,
TimeValue.THIRTY_SECONDS /* TODO should this be longer? infinite? */,
new ActionListener<>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
Expand Down Expand Up @@ -346,7 +347,7 @@ private void cancelTask(Long allocationId) {
if (task.markAsCancelled()) {
// Cancel the local task using the task manager
String reason = "task has been removed, cancelling locally";
persistentTasksService.sendCancelRequest(task.getId(), reason, null, new ActionListener<>() {
persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<>() {
@Override
public void onResponse(ListTasksResponse cancelTasksResponse) {
logger.trace(
Expand Down
Loading

0 comments on commit a1fd7bc

Please sign in to comment.