Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task report fields in response of SQL statements endpoint #16808

Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;

import java.util.Map;
import java.util.Objects;

/**
* Tree of {@link CounterSnapshots} (named counter snapshots) organized by stage and worker.
Expand Down Expand Up @@ -108,4 +109,39 @@ private void putAll(final Map<Integer, Map<Integer, CounterSnapshots>> otherMap)
}
}
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CounterSnapshotsTree that = (CounterSnapshotsTree) o;
synchronized (snapshotsMap) {
synchronized (that.snapshotsMap) {
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
return Objects.equals(snapshotsMap, that.snapshotsMap);
}
}
}

@Override
public int hashCode()
{
synchronized (snapshotsMap) {
return Objects.hashCode(snapshotsMap);
}
}

@Override
public String toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for the new test added in the PR: testMSQSelectRunningQueryWithDetail as assertSqlStatementResult() method has the following assertion logic for counters:

if (actual.getCounters() == null || expected.getCounters() == null) {
       Assert.assertEquals(expected.getCounters(), actual.getCounters());
     } else {
       Assert.assertEquals(expected.getCounters().toString(), actual.getCounters().toString());
     }

{
synchronized (snapshotsMap) {
return "CounterSnapshotsTree{" +
"snapshotsMap=" + snapshotsMap +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.sql.SqlStatementState;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -51,6 +54,27 @@ public class SqlStatementResult
@Nullable
private final ErrorResponse errorResponse;

@Nullable
private final MSQStagesReport stages;

@Nullable
private final CounterSnapshotsTree counters;

@Nullable
private final List<MSQErrorReport> warnings;

public SqlStatementResult(
String queryId,
SqlStatementState state,
DateTime createdAt,
List<ColumnNameAndTypes> sqlRowSignature,
Long durationMs,
ResultSetInformation resultSetInformation,
ErrorResponse errorResponse
)
{
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null, null);
}

@JsonCreator
public SqlStatementResult(
Expand All @@ -67,8 +91,13 @@ public SqlStatementResult(
@Nullable @JsonProperty("result")
ResultSetInformation resultSetInformation,
@Nullable @JsonProperty("errorDetails")
ErrorResponse errorResponse

ErrorResponse errorResponse,
@Nullable @JsonProperty("stages")
MSQStagesReport stages,
@Nullable @JsonProperty("counters")
CounterSnapshotsTree counters,
@Nullable @JsonProperty("warnings")
List<MSQErrorReport> warnings
)
{
this.queryId = queryId;
Expand All @@ -78,6 +107,9 @@ public SqlStatementResult(
this.durationMs = durationMs;
this.resultSetInformation = resultSetInformation;
this.errorResponse = errorResponse;
this.stages = stages;
this.counters = counters;
this.warnings = warnings;
}

@JsonProperty
Expand Down Expand Up @@ -130,6 +162,29 @@ public ErrorResponse getErrorResponse()
return errorResponse;
}

@JsonProperty("stages")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public MSQStagesReport getStages()
{
return stages;
}

@JsonProperty("counters")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public CounterSnapshotsTree getCounters()
{
return counters;
}

@JsonProperty("warnings")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<MSQErrorReport> getWarnings()
{
return warnings;
}

@Override
public boolean equals(Object o)
Expand All @@ -150,6 +205,9 @@ public boolean equals(Object o)
) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals(
errorResponse == null ? null : errorResponse.getAsMap(),
that.errorResponse == null ? null : that.errorResponse.getAsMap()
) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters) && Objects.equals(
warnings,
that.warnings
);
}

Expand All @@ -163,7 +221,10 @@ public int hashCode()
sqlRowSignature,
durationMs,
resultSetInformation,
errorResponse == null ? null : errorResponse.getAsMap()
errorResponse == null ? null : errorResponse.getAsMap(),
stages,
counters,
warnings
);
}

Expand All @@ -180,6 +241,9 @@ public String toString()
", errorResponse=" + (errorResponse == null
? "{}"
: errorResponse.getAsMap().toString()) +
", stages=" + stages +
", counters=" + counters +
", warnings=" + warnings +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -231,7 +232,9 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques
@Path("/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response doGetStatus(
@PathParam("id") final String queryId, @Context final HttpServletRequest req
@PathParam("id") final String queryId,
@QueryParam("detail") boolean detail,
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
@Context final HttpServletRequest req
)
{
try {
Expand All @@ -242,7 +245,8 @@ public Response doGetStatus(
queryId,
authenticationResult,
true,
Action.READ
Action.READ,
detail
);

if (sqlStatementResult.isPresent()) {
Expand Down Expand Up @@ -369,7 +373,8 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina
queryId,
authenticationResult,
false,
Action.WRITE
Action.WRITE,
false
);
if (sqlStatementResult.isPresent()) {
switch (sqlStatementResult.get().getState()) {
Expand Down Expand Up @@ -479,7 +484,7 @@ private Response buildTaskResponse(Sequence<Object[]> sequence, AuthenticationRe
}
String taskId = String.valueOf(firstRow[0]);

Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ);
Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ, false);

if (statementResult.isPresent()) {
return Response.status(Response.Status.OK).entity(statementResult.get()).build();
Expand Down Expand Up @@ -565,7 +570,8 @@ private Optional<SqlStatementResult> getStatementStatus(
String queryId,
AuthenticationResult authenticationResult,
boolean withResults,
Action forAction
Action forAction,
boolean detail
) throws DruidException
{
TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId);
Expand All @@ -582,14 +588,29 @@ private Optional<SqlStatementResult> getStatementStatus(
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);

Supplier<Optional<MSQTaskReportPayload>> msqTaskReportPayloadSupplier = () -> {
try {
return Optional.ofNullable(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
));
}
catch (DruidException e) {
if (e.getErrorCode().equals("notFound")) {
return Optional.empty();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
}
throw e;
}
};

if (SqlStatementState.FAILED == sqlStatementState) {
return SqlStatementResourceHelper.getExceptionPayload(
queryId,
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
msqTaskReportPayloadSupplier.get().orElse(null),
jsonMapper,
detail
);
} else {
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
Expand All @@ -605,7 +626,10 @@ private Optional<SqlStatementResult> getStatementStatus(
sqlStatementState,
msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null,
null
null,
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont you call the overlord 3 times here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cryptoe Have raised a PR to get rid of the redundant calls: #16839

Thanks for pointing this out!

detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ public static Optional<SqlStatementResult> getExceptionPayload(
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
MSQTaskReportPayload msqTaskReportPayload,
ObjectMapper jsonMapper,
boolean detail
)
{
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(msqTaskReportPayload);
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
Expand All @@ -267,7 +268,10 @@ public static Optional<SqlStatementResult> getExceptionPayload(
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
.toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand All @@ -293,7 +297,10 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
}).toErrorResponse(),
detail ? getQueryStagesReport(msqTaskReportPayload) : null,
detail ? getQueryCounters(msqTaskReportPayload) : null,
detail ? getQueryWarningDetails(msqTaskReportPayload) : null
));
}

Expand Down Expand Up @@ -374,6 +381,24 @@ private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payl
return payload == null ? null : payload.getStatus().getErrorReport();
}

@Nullable
public static List<MSQErrorReport> getQueryWarningDetails(MSQTaskReportPayload payload)
{
return payload == null ? null : new ArrayList<>(payload.getStatus().getWarningReports());
}

@Nullable
public static MSQStagesReport getQueryStagesReport(MSQTaskReportPayload payload)
{
return payload == null ? null : payload.getStages();
}

@Nullable
public static CounterSnapshotsTree getQueryCounters(MSQTaskReportPayload payload)
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
{
return payload == null ? null : payload.getCounters();
}

@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ public void sanityTest() throws JsonProcessingException
+ " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}],"
+ " durationInMs=100,"
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]},"
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}",
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}},"
+ " stages=null,"
+ " counters=null,"
+ " warnings=null}",
SQL_STATEMENT_RESULT.toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ public void testInsert()
);
Assert.assertEquals(expected, actual);

Response getResponse = resource.doGetStatus(actual.getQueryId(), SqlStatementResourceTest.makeOkRequest());
Response getResponse = resource.doGetStatus(actual.getQueryId(), false, SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), getResponse.getStatus());
Assert.assertEquals(expected, getResponse.getEntity());

Expand Down Expand Up @@ -732,7 +732,7 @@ public void testReplaceAll()
);
Assert.assertEquals(expected, actual);

Response getResponse = resource.doGetStatus(actual.getQueryId(), SqlStatementResourceTest.makeOkRequest());
Response getResponse = resource.doGetStatus(actual.getQueryId(), false, SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), getResponse.getStatus());
Assert.assertEquals(expected, getResponse.getEntity());

Expand Down
Loading
Loading