diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 46e7e4a14491..6c9f1d899413 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -969,10 +969,11 @@ private StageOutputHolder getOrCreateStageOutputHolder(final StageId stageId, fi /** * Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}. + * In addition to being a token for cancellation, this also appears in thread dumps, so make it a little descriptive. */ private static String cancellationIdFor(final StageId stageId, final int workerNumber) { - return StringUtils.format("%s_%s", stageId, workerNumber); + return StringUtils.format("msq-worker[%s_%s]", stageId, workerNumber); } /** diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index f255fbe13a6b..49573b39b4ec 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -222,6 +222,7 @@ private Optional> runProcessorNow() } } + final String threadName = Thread.currentThread().getName(); boolean canceled = false; Either> retVal; @@ -230,6 +231,11 @@ private Optional> runProcessorNow() throw new InterruptedException(); } + if (cancellationId != null) { + // Set the thread name to something involving the cancellationId, to make thread dumps more useful. + Thread.currentThread().setName(threadName + "-" + cancellationId); + } + retVal = Either.value(processor.runIncrementally(readableInputs)); } catch (Throwable e) { @@ -253,6 +259,9 @@ private Optional> runProcessorNow() canceled = true; } } + + // Restore original thread name. + Thread.currentThread().setName(threadName); } }