Skip to content

Commit

Permalink
Add task report fields in response of SQL statements endpoint (#16808)
Browse files Browse the repository at this point in the history
If the optional query parameter detail is supplied, then the response also includes the following:

 * A stages object that summarizes information about the different stages being used for query execution, such as stage number, phase, start time, duration, input and output information, processing methods, and partitioning.
* A counters object that provides details on the rows, bytes, and files processed at various stages for each worker across different channels, along with sort progress.
* A warnings object that provides details about any warnings.
  • Loading branch information
Akshat-Jain authored Aug 1, 2024
1 parent 01f6cfc commit bb4d6cc
Show file tree
Hide file tree
Showing 8 changed files with 681 additions and 89 deletions.
432 changes: 429 additions & 3 deletions docs/api-reference/sql-api.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,14 @@ private void putAll(final Map<Integer, Map<Integer, CounterSnapshots>> otherMap)
}
}
}

@Override
public String toString()
{
synchronized (snapshotsMap) {
return "CounterSnapshotsTree{" +
"snapshotsMap=" + snapshotsMap +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.sql.SqlStatementState;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;

public class SqlStatementResult
{
Expand All @@ -51,6 +53,27 @@ public class SqlStatementResult
@Nullable
private final ErrorResponse errorResponse;

@Nullable
private final MSQStagesReport stages;

@Nullable
private final CounterSnapshotsTree counters;

@Nullable
private final List<MSQErrorReport> warnings;

public SqlStatementResult(
String queryId,
SqlStatementState state,
DateTime createdAt,
List<ColumnNameAndTypes> sqlRowSignature,
Long durationMs,
ResultSetInformation resultSetInformation,
ErrorResponse errorResponse
)
{
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null, null);
}

@JsonCreator
public SqlStatementResult(
Expand All @@ -67,8 +90,13 @@ public SqlStatementResult(
@Nullable @JsonProperty("result")
ResultSetInformation resultSetInformation,
@Nullable @JsonProperty("errorDetails")
ErrorResponse errorResponse

ErrorResponse errorResponse,
@Nullable @JsonProperty("stages")
MSQStagesReport stages,
@Nullable @JsonProperty("counters")
CounterSnapshotsTree counters,
@Nullable @JsonProperty("warnings")
List<MSQErrorReport> warnings
)
{
this.queryId = queryId;
Expand All @@ -78,6 +106,9 @@ public SqlStatementResult(
this.durationMs = durationMs;
this.resultSetInformation = resultSetInformation;
this.errorResponse = errorResponse;
this.stages = stages;
this.counters = counters;
this.warnings = warnings;
}

@JsonProperty
Expand Down Expand Up @@ -130,41 +161,28 @@ public ErrorResponse getErrorResponse()
return errorResponse;
}

@JsonProperty("stages")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public MSQStagesReport getStages()
{
return stages;
}

@Override
public boolean equals(Object o)
@JsonProperty("counters")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public CounterSnapshotsTree getCounters()
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlStatementResult that = (SqlStatementResult) o;
return Objects.equals(queryId, that.queryId) && state == that.state && Objects.equals(
createdAt,
that.createdAt
) && Objects.equals(sqlRowSignature, that.sqlRowSignature) && Objects.equals(
durationMs,
that.durationMs
) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals(
errorResponse == null ? null : errorResponse.getAsMap(),
that.errorResponse == null ? null : that.errorResponse.getAsMap()
);
return counters;
}

@Override
public int hashCode()
@JsonProperty("warnings")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<MSQErrorReport> getWarnings()
{
return Objects.hash(
queryId,
state,
createdAt,
sqlRowSignature,
durationMs,
resultSetInformation,
errorResponse == null ? null : errorResponse.getAsMap()
);
return warnings;
}

@Override
Expand All @@ -180,6 +198,9 @@ public String toString()
", errorResponse=" + (errorResponse == null
? "{}"
: errorResponse.getAsMap().toString()) +
", stages=" + stages +
", counters=" + counters +
", warnings=" + warnings +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -231,7 +232,9 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques
@Path("/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response doGetStatus(
@PathParam("id") final String queryId, @Context final HttpServletRequest req
@PathParam("id") final String queryId,
@QueryParam("detail") boolean detail,
@Context final HttpServletRequest req
)
{
try {
Expand All @@ -242,7 +245,8 @@ public Response doGetStatus(
queryId,
authenticationResult,
true,
Action.READ
Action.READ,
detail
);

if (sqlStatementResult.isPresent()) {
Expand Down Expand Up @@ -369,7 +373,8 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina
queryId,
authenticationResult,
false,
Action.WRITE
Action.WRITE,
false
);
if (sqlStatementResult.isPresent()) {
switch (sqlStatementResult.get().getState()) {
Expand Down Expand Up @@ -479,7 +484,7 @@ private Response buildTaskResponse(Sequence<Object[]> sequence, AuthenticationRe
}
String taskId = String.valueOf(firstRow[0]);

Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ);
Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ, false);

if (statementResult.isPresent()) {
return Response.status(Response.Status.OK).entity(statementResult.get()).build();
Expand Down Expand Up @@ -565,7 +570,8 @@ private Optional<SqlStatementResult> getStatementStatus(
String queryId,
AuthenticationResult authenticationResult,
boolean withResults,
Action forAction
Action forAction,
boolean detail
) throws DruidException
{
TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId);
Expand All @@ -582,14 +588,29 @@ private Optional<SqlStatementResult> getStatementStatus(
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);

Supplier<Optional<MSQTaskReportPayload>> msqTaskReportPayloadSupplier = () -> {
try {
return Optional.ofNullable(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
));
}
catch (DruidException e) {
if (e.getErrorCode().equals("notFound")) {
return Optional.empty();
}
throw e;
}
};

if (SqlStatementState.FAILED == sqlStatementState) {
return SqlStatementResourceHelper.getExceptionPayload(
queryId,
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
msqTaskReportPayloadSupplier.get().orElse(null),
jsonMapper,
detail
);
} else {
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
Expand All @@ -605,7 +626,10 @@ private Optional<SqlStatementResult> getStatementStatus(
sqlStatementState,
msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null,
null
null,
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ public static Optional<SqlStatementResult> getExceptionPayload(
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
MSQTaskReportPayload msqTaskReportPayload,
ObjectMapper jsonMapper,
boolean detail
)
{
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(msqTaskReportPayload);
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
Expand All @@ -267,7 +268,10 @@ public static Optional<SqlStatementResult> getExceptionPayload(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
.toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand All @@ -293,7 +297,10 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
}).toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand Down Expand Up @@ -353,7 +360,7 @@ public Object[] next()
}

@Nullable
public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskReportPayload)
public static MSQStagesReport.Stage getFinalStage(@Nullable MSQTaskReportPayload msqTaskReportPayload)
{
if (msqTaskReportPayload == null || msqTaskReportPayload.getStages().getStages() == null) {
return null;
Expand All @@ -369,11 +376,29 @@ public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskRe
}

@Nullable
private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payload)
private static MSQErrorReport getQueryExceptionDetails(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStatus().getErrorReport();
}

@Nullable
public static List<MSQErrorReport> getQueryWarningDetails(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : new ArrayList<>(payload.getStatus().getWarningReports());
}

@Nullable
public static MSQStagesReport getQueryStagesReport(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStages();
}

@Nullable
public static CounterSnapshotsTree getQueryCounters(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getCounters();
}

@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ public void sanityTest() throws JsonProcessingException
{

Assert.assertEquals(JSON_STRING, MAPPER.writeValueAsString(SQL_STATEMENT_RESULT));
Assert.assertEquals(
SQL_STATEMENT_RESULT,
MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class)
);
Assert.assertEquals(
SQL_STATEMENT_RESULT.hashCode(),
MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class).hashCode()
);
Assert.assertEquals(
"SqlStatementResult{"
+ "queryId='q1',"
Expand All @@ -87,7 +79,10 @@ public void sanityTest() throws JsonProcessingException
+ " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}],"
+ " durationInMs=100,"
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]},"
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}",
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}},"
+ " stages=null,"
+ " counters=null,"
+ " warnings=null}",
SQL_STATEMENT_RESULT.toString()
);
}
Expand Down
Loading

0 comments on commit bb4d6cc

Please sign in to comment.