diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java index a650f7325c4c..6fd74a836043 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/entity/SqlStatementResult.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.error.ErrorResponse; import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.report.MSQStagesReport; import org.apache.druid.msq.sql.SqlStatementState; import org.joda.time.DateTime; @@ -59,6 +60,9 @@ public class SqlStatementResult @Nullable private final CounterSnapshotsTree counters; + @Nullable + private final List warnings; + public SqlStatementResult( String queryId, SqlStatementState state, @@ -69,7 +73,7 @@ public SqlStatementResult( ErrorResponse errorResponse ) { - this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null); + this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null, null); } @JsonCreator @@ -91,7 +95,9 @@ public SqlStatementResult( @Nullable @JsonProperty("stages") MSQStagesReport stages, @Nullable @JsonProperty("counters") - CounterSnapshotsTree counters + CounterSnapshotsTree counters, + @Nullable @JsonProperty("warnings") + List warnings ) { this.queryId = queryId; @@ -103,6 +109,7 @@ public SqlStatementResult( this.errorResponse = errorResponse; this.stages = stages; this.counters = counters; + this.warnings = warnings; } @JsonProperty @@ -171,6 +178,13 @@ public CounterSnapshotsTree getCounters() return counters; } + @JsonProperty("warnings") + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + public List getWarnings() + { + return warnings; + } @Override public boolean equals(Object o) @@ -191,7 +205,10 @@ public boolean equals(Object o) ) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals( errorResponse == null ? null : errorResponse.getAsMap(), that.errorResponse == null ? null : that.errorResponse.getAsMap() - ) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters); + ) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters) && Objects.equals( + warnings, + that.warnings + ); } @Override @@ -206,7 +223,8 @@ public int hashCode() resultSetInformation, errorResponse == null ? null : errorResponse.getAsMap(), stages, - counters + counters, + warnings ); } @@ -225,6 +243,7 @@ public String toString() : errorResponse.getAsMap().toString()) + ", stages=" + stages + ", counters=" + counters + + ", warnings=" + warnings + '}'; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 2172bc69e913..7575dd00dcb8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -47,7 +47,6 @@ import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.counters.CounterSnapshotsTree; import org.apache.druid.msq.exec.ResultsContext; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.MSQControllerTask; @@ -56,7 +55,6 @@ import org.apache.druid.msq.indexing.destination.MSQDestination; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; -import org.apache.druid.msq.indexing.report.MSQStagesReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; @@ -589,38 +587,27 @@ private Optional getStatementStatus( MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction); SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); + MSQTaskReportPayload msqTaskReportPayload = null; + try { + msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)); + } + catch (DruidException e) { + if (!e.getErrorCode().equals("notFound")) { + throw e; + } + } + if (SqlStatementState.FAILED == sqlStatementState) { return SqlStatementResourceHelper.getExceptionPayload( queryId, taskResponse, statusPlus, sqlStatementState, - contactOverlord(overlordClient.taskReportAsMap(queryId), queryId), - jsonMapper + msqTaskReportPayload, + jsonMapper, + detail ); } else { - MSQStagesReport stages = null; - CounterSnapshotsTree counters = null; - - if (detail) { - MSQTaskReportPayload msqTaskReportPayload; - try { - msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)); - } - catch (DruidException e) { - if (e.getErrorCode().equals("notFound")) { - msqTaskReportPayload = null; - } else { - throw e; - } - } - - if (msqTaskReportPayload != null) { - stages = msqTaskReportPayload.getStages(); - counters = msqTaskReportPayload.getCounters(); - } - } - Optional> signature = SqlStatementResourceHelper.getSignature(msqControllerTask); return Optional.of(new SqlStatementResult( queryId, @@ -635,8 +622,9 @@ private Optional getStatementStatus( msqControllerTask.getQuerySpec().getDestination() ).orElse(null) : null, null, - stages, - counters + detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayload) : null, + detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayload) : null, + detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayload) : null )); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java index 4f07dcb2cc02..db83e5f78a40 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java @@ -250,11 +250,12 @@ public static Optional getExceptionPayload( TaskStatusResponse taskResponse, TaskStatusPlus statusPlus, SqlStatementState sqlStatementState, - TaskReport.ReportMap msqPayload, - ObjectMapper jsonMapper + MSQTaskReportPayload msqTaskReportPayload, + ObjectMapper jsonMapper, + boolean detail ) { - final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload)); + final MSQErrorReport exceptionDetails = getQueryExceptionDetails(msqTaskReportPayload); final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault(); if (exceptionDetails == null || fault == null) { return Optional.of(new SqlStatementResult( @@ -267,7 +268,10 @@ public static Optional getExceptionPayload( DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) .build("%s", taskResponse.getStatus().getErrorMsg()) - .toErrorResponse() + .toErrorResponse(), + detail ? getQueryStagesReport(msqTaskReportPayload) : null, + detail ? getQueryCounters(msqTaskReportPayload) : null, + detail ? getQueryWarningDetails(msqTaskReportPayload) : null )); } @@ -293,7 +297,10 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob) ex.withContext(exceptionContext); return ex; } - }).toErrorResponse() + }).toErrorResponse(), + detail ? getQueryStagesReport(msqTaskReportPayload) : null, + detail ? getQueryCounters(msqTaskReportPayload) : null, + detail ? getQueryWarningDetails(msqTaskReportPayload) : null )); } @@ -374,6 +381,24 @@ private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payl return payload == null ? null : payload.getStatus().getErrorReport(); } + @Nullable + public static List getQueryWarningDetails(MSQTaskReportPayload payload) + { + return payload == null ? null : new ArrayList<>(payload.getStatus().getWarningReports()); + } + + @Nullable + public static MSQStagesReport getQueryStagesReport(MSQTaskReportPayload payload) + { + return payload == null ? null : payload.getStages(); + } + + @Nullable + public static CounterSnapshotsTree getQueryCounters(MSQTaskReportPayload payload) + { + return payload == null ? null : payload.getCounters(); + } + @Nullable public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java index e5c424a9b52c..e03932a23dd3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/entity/SqlStatementResultTest.java @@ -89,7 +89,8 @@ public void sanityTest() throws JsonProcessingException + " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]}," + " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}," + " stages=null," - + " counters=null}", + + " counters=null," + + " warnings=null}", SQL_STATEMENT_RESULT.toString() ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index fa308b0471bb..e170d8a2c30e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -97,6 +97,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -449,7 +450,11 @@ private void setupMocks(OverlordClient indexingServiceClient) ))); - Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY)) + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_SELECT_MSQ_QUERY))) + .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) + .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(RUNNING_SELECT_MSQ_QUERY))) .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) @@ -584,6 +589,10 @@ private void setupMocks(OverlordClient indexingServiceClient) Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) .thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT))); + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK))) + .thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT))); + Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(RUNNING_INSERT_MSQ_QUERY))) + .thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT))); Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY)) .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( @@ -757,8 +766,6 @@ public void testMSQSelectRunningQuery() @Test public void testMSQSelectRunningQueryWithDetail() { - Mockito.when(overlordClient.taskReportAsMap(RUNNING_SELECT_MSQ_QUERY)) - .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); Response response = resource.doGetStatus(RUNNING_SELECT_MSQ_QUERY, true, makeOkRequest()); Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); @@ -771,7 +778,8 @@ public void testMSQSelectRunningQueryWithDetail() null, null, selectTaskReport.get().getPayload().getStages(), - selectTaskReport.get().getPayload().getCounters() + selectTaskReport.get().getPayload().getCounters(), + new ArrayList<>(selectTaskReport.get().getPayload().getStatus().getWarningReports()) ); Assert.assertEquals(