Skip to content

Commit

Permalink
Add task report stages and counters in response of /sql/statements/qu…
Browse files Browse the repository at this point in the history
…eryId API
  • Loading branch information
Akshat-Jain committed Jul 26, 2024
1 parent 71725b4 commit 4ea80bb
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;

import java.util.Map;
import java.util.Objects;

/**
* Tree of {@link CounterSnapshots} (named counter snapshots) organized by stage and worker.
Expand Down Expand Up @@ -108,4 +109,37 @@ private void putAll(final Map<Integer, Map<Integer, CounterSnapshots>> otherMap)
}
}
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CounterSnapshotsTree that = (CounterSnapshotsTree) o;
synchronized (snapshotsMap) {
return Objects.equals(snapshotsMap, that.snapshotsMap);
}
}

@Override
public int hashCode()
{
synchronized (snapshotsMap) {
return Objects.hashCode(snapshotsMap);
}
}

@Override
public String toString()
{
synchronized (snapshotsMap) {
return "CounterSnapshotsTree{" +
"snapshotsMap=" + snapshotsMap +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.sql.SqlStatementState;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -51,6 +53,24 @@ public class SqlStatementResult
@Nullable
private final ErrorResponse errorResponse;

@Nullable
private final MSQStagesReport stages;

@Nullable
private final CounterSnapshotsTree counters;

public SqlStatementResult(
String queryId,
SqlStatementState state,
DateTime createdAt,
List<ColumnNameAndTypes> sqlRowSignature,
Long durationMs,
ResultSetInformation resultSetInformation,
ErrorResponse errorResponse
)
{
this(queryId, state, createdAt, sqlRowSignature, durationMs, resultSetInformation, errorResponse, null, null);
}

@JsonCreator
public SqlStatementResult(
Expand All @@ -67,8 +87,11 @@ public SqlStatementResult(
@Nullable @JsonProperty("result")
ResultSetInformation resultSetInformation,
@Nullable @JsonProperty("errorDetails")
ErrorResponse errorResponse

ErrorResponse errorResponse,
@Nullable @JsonProperty("stages")
MSQStagesReport stages,
@Nullable @JsonProperty("counters")
CounterSnapshotsTree counters
)
{
this.queryId = queryId;
Expand All @@ -78,6 +101,8 @@ public SqlStatementResult(
this.durationMs = durationMs;
this.resultSetInformation = resultSetInformation;
this.errorResponse = errorResponse;
this.stages = stages;
this.counters = counters;
}

@JsonProperty
Expand Down Expand Up @@ -130,6 +155,22 @@ public ErrorResponse getErrorResponse()
return errorResponse;
}

@JsonProperty("stages")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public MSQStagesReport getStages()
{
return stages;
}

@JsonProperty("counters")
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
public CounterSnapshotsTree getCounters()
{
return counters;
}


@Override
public boolean equals(Object o)
Expand All @@ -150,7 +191,7 @@ public boolean equals(Object o)
) && Objects.equals(resultSetInformation, that.resultSetInformation) && Objects.equals(
errorResponse == null ? null : errorResponse.getAsMap(),
that.errorResponse == null ? null : that.errorResponse.getAsMap()
);
) && Objects.equals(stages, that.stages) && Objects.equals(counters, that.counters);
}

@Override
Expand All @@ -163,7 +204,9 @@ public int hashCode()
sqlRowSignature,
durationMs,
resultSetInformation,
errorResponse == null ? null : errorResponse.getAsMap()
errorResponse == null ? null : errorResponse.getAsMap(),
stages,
counters
);
}

Expand All @@ -180,6 +223,8 @@ public String toString()
", errorResponse=" + (errorResponse == null
? "{}"
: errorResponse.getAsMap().toString()) +
", stages=" + stages +
", counters=" + counters +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.MSQControllerTask;
Expand All @@ -55,6 +56,7 @@
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
Expand Down Expand Up @@ -231,7 +233,9 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques
@Path("/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response doGetStatus(
@PathParam("id") final String queryId, @Context final HttpServletRequest req
@PathParam("id") final String queryId,
@QueryParam("detail") boolean detail,
@Context final HttpServletRequest req
)
{
try {
Expand All @@ -242,7 +246,8 @@ public Response doGetStatus(
queryId,
authenticationResult,
true,
Action.READ
Action.READ,
detail
);

if (sqlStatementResult.isPresent()) {
Expand Down Expand Up @@ -369,7 +374,8 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina
queryId,
authenticationResult,
false,
Action.WRITE
Action.WRITE,
false
);
if (sqlStatementResult.isPresent()) {
switch (sqlStatementResult.get().getState()) {
Expand Down Expand Up @@ -479,7 +485,7 @@ private Response buildTaskResponse(Sequence<Object[]> sequence, AuthenticationRe
}
String taskId = String.valueOf(firstRow[0]);

Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ);
Optional<SqlStatementResult> statementResult = getStatementStatus(taskId, authenticationResult, true, Action.READ, false);

if (statementResult.isPresent()) {
return Response.status(Response.Status.OK).entity(statementResult.get()).build();
Expand Down Expand Up @@ -565,7 +571,8 @@ private Optional<SqlStatementResult> getStatementStatus(
String queryId,
AuthenticationResult authenticationResult,
boolean withResults,
Action forAction
Action forAction,
boolean detail
) throws DruidException
{
TaskStatusResponse taskResponse = contactOverlord(overlordClient.taskStatus(queryId), queryId);
Expand All @@ -592,6 +599,28 @@ private Optional<SqlStatementResult> getStatementStatus(
jsonMapper
);
} else {
MSQStagesReport stages = null;
CounterSnapshotsTree counters = null;

if (detail) {
MSQTaskReportPayload msqTaskReportPayload;
try {
msqTaskReportPayload = SqlStatementResourceHelper.getPayload(contactOverlord(overlordClient.taskReportAsMap(queryId), queryId));
}
catch (DruidException e) {
if (e.getErrorCode().equals("notFound")) {
msqTaskReportPayload = null;
} else {
throw e;
}
}

if (msqTaskReportPayload != null) {
stages = msqTaskReportPayload.getStages();
counters = msqTaskReportPayload.getCounters();
}
}

Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
return Optional.of(new SqlStatementResult(
queryId,
Expand All @@ -605,7 +634,9 @@ private Optional<SqlStatementResult> getStatementStatus(
sqlStatementState,
msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null,
null
null,
stages,
counters
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public void sanityTest() throws JsonProcessingException
+ " sqlRowSignature=[ColumnNameAndTypes{colName='_time', sqlTypeName='TIMESTAMP', nativeTypeName='LONG'}, ColumnNameAndTypes{colName='alias', sqlTypeName='VARCHAR', nativeTypeName='STRING'}, ColumnNameAndTypes{colName='market', sqlTypeName='VARCHAR', nativeTypeName='STRING'}],"
+ " durationInMs=100,"
+ " resultSetInformation=ResultSetInformation{numTotalRows=1, totalSizeInBytes=1, resultFormat=object, records=null, dataSource='ds', pages=[PageInformation{id=0, numRows=null, sizeInBytes=1, worker=null, partition=null}]},"
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}}}",
+ " errorResponse={error=druidException, errorCode=QueryNotSupported, persona=USER, category=UNCATEGORIZED, errorMessage=QueryNotSupported, context={}},"
+ " stages=null,"
+ " counters=null}",
SQL_STATEMENT_RESULT.toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ public void testInsert()
);
Assert.assertEquals(expected, actual);

Response getResponse = resource.doGetStatus(actual.getQueryId(), SqlStatementResourceTest.makeOkRequest());
Response getResponse = resource.doGetStatus(actual.getQueryId(), false, SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), getResponse.getStatus());
Assert.assertEquals(expected, getResponse.getEntity());

Expand Down Expand Up @@ -732,7 +732,7 @@ public void testReplaceAll()
);
Assert.assertEquals(expected, actual);

Response getResponse = resource.doGetStatus(actual.getQueryId(), SqlStatementResourceTest.makeOkRequest());
Response getResponse = resource.doGetStatus(actual.getQueryId(), false, SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), getResponse.getStatus());
Assert.assertEquals(expected, getResponse.getEntity());

Expand Down
Loading

0 comments on commit 4ea80bb

Please sign in to comment.