Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task report fields in response of SQL statements endpoint #16808

Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,14 @@ private void putAll(final Map<Integer, Map<Integer, CounterSnapshots>> otherMap)
}
}
}

@Override
public String toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for the new test added in the PR: testMSQSelectRunningQueryWithDetail as assertSqlStatementResult() method has the following assertion logic for counters:

if (actual.getCounters() == null || expected.getCounters() == null) {
       Assert.assertEquals(expected.getCounters(), actual.getCounters());
     } else {
       Assert.assertEquals(expected.getCounters().toString(), actual.getCounters().toString());
     }

{
synchronized (snapshotsMap) {
return "CounterSnapshotsTree{" +
"snapshotsMap=" + snapshotsMap +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.sql.SqlStatementState;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;

public class SqlStatementResult
{
Expand All @@ -51,6 +53,27 @@ public class SqlStatementResult
@Nullable
private final ErrorResponse errorResponse;

@Nullable
private final MSQStagesReport stages;

@Nullable
private final CounterSnapshotsTree counters;

@Nullable
private final List<MSQErrorReport> warnings;

public SqlStatementResult(
String queryId,
SqlStatementState state,
DateTime createdAt,
List<ColumnNameAndTypes> sqlRowSignature,
Long durationMs,
ResultSetInformation resultSetInformation,
ErrorResponse errorResponse
)
{
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null, null);
}

@JsonCreator
public SqlStatementResult(
Expand All @@ -67,8 +90,13 @@ public SqlStatementResult(
@Nullable @JsonProperty("result")
ResultSetInformation resultSetInformation,
@Nullable @JsonProperty("errorDetails")
ErrorResponse errorResponse

ErrorResponse errorResponse,
@Nullable @JsonProperty("stages")
MSQStagesReport stages,
@Nullable @JsonProperty("counters")
CounterSnapshotsTree counters,
@Nullable @JsonProperty("warnings")
List<MSQErrorReport> warnings
)
{
this.queryId = queryId;
Expand All @@ -78,6 +106,9 @@ public SqlStatementResult(
this.durationMs = durationMs;
this.resultSetInformation = resultSetInformation;
this.errorResponse = errorResponse;
this.stages = stages;
this.counters = counters;
this.warnings = warnings;
}

@JsonProperty
Expand Down Expand Up @@ -130,41 +161,28 @@ public ErrorResponse getErrorResponse()
return errorResponse;
}

@JsonProperty("stages")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public MSQStagesReport getStages()
{
return stages;
}

@Override
public boolean equals(Object o)
@JsonProperty("counters")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public CounterSnapshotsTree getCounters()
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlStatementResult that = (SqlStatementResult) o;
return Objects.equals(queryId, that.queryId) && state == that.state && Objects.equals(
createdAt,
that.createdAt
) && Objects.equals(sqlRowSignature, that.sqlRowSignature) && Objects.equals(
durationMs,
that.durationMs
) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals(
errorResponse == null ? null : errorResponse.getAsMap(),
that.errorResponse == null ? null : that.errorResponse.getAsMap()
);
return counters;
}

@Override
public int hashCode()
@JsonProperty("warnings")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<MSQErrorReport> getWarnings()
{
return Objects.hash(
queryId,
state,
createdAt,
sqlRowSignature,
durationMs,
resultSetInformation,
errorResponse == null ? null : errorResponse.getAsMap()
);
return warnings;
}

@Override
Expand All @@ -180,6 +198,9 @@ public String toString()
", errorResponse=" + (errorResponse == null
? "{}"
: errorResponse.getAsMap().toString()) +
", stages=" + stages +
", counters=" + counters +
", warnings=" + warnings +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -231,7 +232,9 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques
@Path("/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response doGetStatus(
@PathParam("id") final String queryId, @Context final HttpServletRequest req
@PathParam("id") final String queryId,
@QueryParam("detail") boolean detail,
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
@Context final HttpServletRequest req
)
{
try {
Expand All @@ -242,7 +245,8 @@ public Response doGetStatus(
queryId,
authenticationResult,
true,
Action.READ
Action.READ,
detail
);

if (sqlStatementResult.isPresent()) {
Expand Down Expand Up @@ -369,7 +373,8 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina
queryId,
authenticationResult,
false,
Action.WRITE
Action.WRITE,
false
);
if (sqlStatementResult.isPresent()) {
switch (sqlStatementResult.get().getState()) {
Expand Down Expand Up @@ -479,7 +484,7 @@ private Response buildTaskResponse(Sequence<Object[]> sequence, AuthenticationRe
}
String taskId = String.valueOf(firstRow[0]);

Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ);
Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ, false);

if (statementResult.isPresent()) {
return Response.status(Response.Status.OK).entity(statementResult.get()).build();
Expand Down Expand Up @@ -565,7 +570,8 @@ private Optional<SqlStatementResult> getStatementStatus(
String queryId,
AuthenticationResult authenticationResult,
boolean withResults,
Action forAction
Action forAction,
boolean detail
) throws DruidException
{
TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId);
Expand All @@ -582,14 +588,29 @@ private Optional<SqlStatementResult> getStatementStatus(
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);

Supplier<Optional<MSQTaskReportPayload>> msqTaskReportPayloadSupplier = () -> {
try {
return Optional.ofNullable(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
));
}
catch (DruidException e) {
if (e.getErrorCode().equals("notFound")) {
return Optional.empty();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
}
throw e;
}
};

if (SqlStatementState.FAILED == sqlStatementState) {
return SqlStatementResourceHelper.getExceptionPayload(
queryId,
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
msqTaskReportPayloadSupplier.get().orElse(null),
jsonMapper,
detail
);
} else {
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
Expand All @@ -605,7 +626,10 @@ private Optional<SqlStatementResult> getStatementStatus(
sqlStatementState,
msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null,
null
null,
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont you call the overlord 3 times here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cryptoe Have raised a PR to get rid of the redundant calls: #16839

Thanks for pointing this out!

detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ public static Optional<SqlStatementResult> getExceptionPayload(
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
MSQTaskReportPayload msqTaskReportPayload,
ObjectMapper jsonMapper,
boolean detail
)
{
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(msqTaskReportPayload);
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
Expand All @@ -267,7 +268,10 @@ public static Optional<SqlStatementResult> getExceptionPayload(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
.toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand All @@ -293,7 +297,10 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
}).toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand Down Expand Up @@ -353,7 +360,7 @@ public Object[] next()
}

@Nullable
public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskReportPayload)
public static MSQStagesReport.Stage getFinalStage(@Nullable MSQTaskReportPayload msqTaskReportPayload)
{
if (msqTaskReportPayload == null || msqTaskReportPayload.getStages().getStages() == null) {
return null;
Expand All @@ -369,11 +376,29 @@ public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskRe
}

@Nullable
private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payload)
private static MSQErrorReport getQueryExceptionDetails(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStatus().getErrorReport();
}

@Nullable
public static List<MSQErrorReport> getQueryWarningDetails(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : new ArrayList<>(payload.getStatus().getWarningReports());
}

@Nullable
public static MSQStagesReport getQueryStagesReport(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStages();
}

@Nullable
public static CounterSnapshotsTree getQueryCounters(@Nullable MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getCounters();
}

@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ public void sanityTest() throws JsonProcessingException
{

Assert.assertEquals(JSON_STRING, MAPPER.writeValueAsString(SQL_STATEMENT_RESULT));
Assert.assertEquals(
SQL_STATEMENT_RESULT,
MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class)
);
Assert.assertEquals(
SQL_STATEMENT_RESULT.hashCode(),
MAPPER.readValue(MAPPER.writeValueAsString(SQL_STATEMENT_RESULT), SqlStatementResult.class).hashCode()
);
Assert.assertEquals(
"SqlStatementResult{"
+ "queryId='q1',"
Expand All @@ -87,7 +79,10 @@ public void sanityTest() throws JsonProcessingException
+ " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}],"
+ " durationInMs=100,"
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]},"
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}",
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}},"
+ " stages=null,"
+ " counters=null,"
+ " warnings=null}",
SQL_STATEMENT_RESULT.toString()
);
}
Expand Down
Loading
Loading