Skip to content

Commit

Permalink
Add new fields for failed scenarios + Add warnings field
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshat-Jain committed Jul 26, 2024
1 parent 4ea80bb commit b611a6b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.sql.SqlStatementState;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -59,6 +60,9 @@ public class SqlStatementResult
@Nullable
private final CounterSnapshotsTree counters;

@Nullable
private final List<MSQErrorReport> warnings;

public SqlStatementResult(
String queryId,
SqlStatementState state,
Expand All @@ -69,7 +73,7 @@ public SqlStatementResult(
ErrorResponse errorResponse
)
{
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null);
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null, null);
}

@JsonCreator
Expand All @@ -91,7 +95,9 @@ public SqlStatementResult(
@Nullable @JsonProperty("stages")
MSQStagesReport stages,
@Nullable @JsonProperty("counters")
CounterSnapshotsTree counters
CounterSnapshotsTree counters,
@Nullable @JsonProperty("warnings")
List<MSQErrorReport> warnings
)
{
this.queryId = queryId;
Expand All @@ -103,6 +109,7 @@ public SqlStatementResult(
this.errorResponse = errorResponse;
this.stages = stages;
this.counters = counters;
this.warnings = warnings;
}

@JsonProperty
Expand Down Expand Up @@ -171,6 +178,13 @@ public CounterSnapshotsTree getCounters()
return counters;
}

@JsonProperty("warnings")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<MSQErrorReport> getWarnings()
{
return warnings;
}

@Override
public boolean equals(Object o)
Expand All @@ -191,7 +205,10 @@ public boolean equals(Object o)
) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals(
errorResponse == null ? null : errorResponse.getAsMap(),
that.errorResponse == null ? null : that.errorResponse.getAsMap()
) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters);
) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters) && Objects.equals(
warnings,
that.warnings
);
}

@Override
Expand All @@ -206,7 +223,8 @@ public int hashCode()
resultSetInformation,
errorResponse == null ? null : errorResponse.getAsMap(),
stages,
counters
counters,
warnings
);
}

Expand All @@ -225,6 +243,7 @@ public String toString()
: errorResponse.getAsMap().toString()) +
", stages=" + stages +
", counters=" + counters +
", warnings=" + warnings +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.MSQControllerTask;
Expand All @@ -56,7 +55,6 @@
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
Expand Down Expand Up @@ -589,38 +587,27 @@ private Optional<SqlStatementResult> getStatementStatus(
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);

MSQTaskReportPayload msqTaskReportPayload = null;
try {
msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap(queryId), queryId));
}
catch (DruidException e) {
if (!e.getErrorCode().equals("notFound")) {
throw e;
}
}

if (SqlStatementState.FAILED == sqlStatementState) {
return SqlStatementResourceHelper.getExceptionPayload(
queryId,
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
msqTaskReportPayload,
jsonMapper,
detail
);
} else {
MSQStagesReport stages = null;
CounterSnapshotsTree counters = null;

if (detail) {
MSQTaskReportPayload msqTaskReportPayload;
try {
msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap(queryId), queryId));
}
catch (DruidException e) {
if (e.getErrorCode().equals("notFound")) {
msqTaskReportPayload = null;
} else {
throw e;
}
}

if (msqTaskReportPayload != null) {
stages = msqTaskReportPayload.getStages();
counters = msqTaskReportPayload.getCounters();
}
}

Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
return Optional.of(new SqlStatementResult(
queryId,
Expand All @@ -635,8 +622,9 @@ private Optional<SqlStatementResult> getStatementStatus(
msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null,
null,
stages,
counters
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayload) : null,
detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayload) : null,
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayload) : null
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ public static Optional<SqlStatementResult> getExceptionPayload(
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
MSQTaskReportPayload msqTaskReportPayload,
ObjectMapper jsonMapper,
boolean detail
)
{
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(msqTaskReportPayload);
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
Expand All @@ -267,7 +268,10 @@ public static Optional<SqlStatementResult> getExceptionPayload(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
.toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand All @@ -293,7 +297,10 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
}).toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand Down Expand Up @@ -374,6 +381,24 @@ private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payl
return payload == null ? null : payload.getStatus().getErrorReport();
}

@Nullable
public static List<MSQErrorReport> getQueryWarningDetails(MSQTaskReportPayload payload)
{
return payload == null ? null : new ArrayList<>(payload.getStatus().getWarningReports());
}

@Nullable
public static MSQStagesReport getQueryStagesReport(MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStages();
}

@Nullable
public static CounterSnapshotsTree getQueryCounters(MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getCounters();
}

@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void sanityTest() throws JsonProcessingException
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]},"
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}},"
+ " stages=null,"
+ " counters=null}",
+ " counters=null,"
+ " warnings=null}",
SQL_STATEMENT_RESULT.toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -449,7 +450,11 @@ private void setupMocks(OverlordClient indexingServiceClient)
)));


Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY))
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_SELECT_MSQ_QUERY)))
.thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get())));
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY)))
.thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get())));
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(RUNNING_SELECT_MSQ_QUERY)))
.thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get())));

Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY)))
Expand Down Expand Up @@ -584,6 +589,10 @@ private void setupMocks(OverlordClient indexingServiceClient)

Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY)))
.thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT)));
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK)))
.thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT)));
Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(RUNNING_INSERT_MSQ_QUERY)))
.thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT)));

Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY))
.thenReturn(Futures.immediateFuture(new TaskPayloadResponse(
Expand Down Expand Up @@ -757,8 +766,6 @@ public void testMSQSelectRunningQuery()
@Test
public void testMSQSelectRunningQueryWithDetail()
{
Mockito.when(overlordClient.taskReportAsMap(RUNNING_SELECT_MSQ_QUERY))
.thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get())));
Response response = resource.doGetStatus(RUNNING_SELECT_MSQ_QUERY, true, makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

Expand All @@ -771,7 +778,8 @@ public void testMSQSelectRunningQueryWithDetail()
null,
null,
selectTaskReport.get().getPayload().getStages(),
selectTaskReport.get().getPayload().getCounters()
selectTaskReport.get().getPayload().getCounters(),
new ArrayList<>(selectTaskReport.get().getPayload().getStatus().getWarningReports())
);

Assert.assertEquals(
Expand Down

0 comments on commit b611a6b

Please sign in to comment.