Skip to content

Commit

Permalink
Introduced includeTrailerHeader to enable TrailerHeaders in respo…
Browse files Browse the repository at this point in the history
…nse (apache#16672)

Introduced includeTrailerHeader to enable TrailerHeaders in response
If enabled, a header X-Error-Message will be added to indicate reasons for partial results.
  • Loading branch information
vivek807 authored Sep 21, 2024
1 parent a4c971c commit df680ba
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class QueryResource implements QueryCountStatsProvider
public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context";
public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message";
public static final String RESPONSE_COMPLETE_TRAILER_HEADER = "X-Druid-Response-Complete";
public static final String HEADER_ETAG = "ETag";

protected final QueryLifecycleFactory queryLifecycleFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.ForbiddenException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;

import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
Expand All @@ -54,6 +56,7 @@
public abstract class QueryResultPusher
{
private static final Logger log = new Logger(QueryResultPusher.class);
protected static final String RESULT_TRAILER_HEADERS = QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER;

private final HttpServletRequest request;
private final String queryId;
Expand All @@ -63,6 +66,7 @@ public abstract class QueryResultPusher
private final QueryResource.QueryMetricCounter counter;
private final MediaType contentType;
private final Map<String, String> extraHeaders;
private final HttpFields trailerFields;

private StreamingHttpResponseAccumulator accumulator;
private AsyncContext asyncContext;
Expand All @@ -87,6 +91,7 @@ public QueryResultPusher(
this.counter = counter;
this.contentType = contentType;
this.extraHeaders = extraHeaders;
this.trailerFields = new HttpFields();
}

/**
Expand Down Expand Up @@ -120,7 +125,9 @@ public Response push()

final Response.ResponseBuilder startResponse = resultsWriter.start();
if (startResponse != null) {
startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId);
startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId)
.header(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS);

for (Map.Entry<String, String> entry : extraHeaders.entrySet()) {
startResponse.header(entry.getKey(), entry.getValue());
}
Expand All @@ -143,6 +150,17 @@ public Response push()
response.setHeader(entry.getKey(), entry.getValue());
}

if (response instanceof org.eclipse.jetty.server.Response) {
org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response;

jettyResponse.setHeader(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS);
jettyResponse.setTrailers(() -> trailerFields);

// Start with complete status

trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "true");
}

accumulator = new StreamingHttpResponseAccumulator(queryResponse.getResponseContext(), resultsWriter);

results.accumulate(null, accumulator);
Expand Down Expand Up @@ -223,6 +241,8 @@ private Response handleDruidException(ResultsWriter resultsWriter, DruidExceptio
// also throwing the exception body into the response to make it easier for the client to choke if it manages
// to parse a meaningful object out, but that's potentially an API change so we leave that as an exercise for
// the future.
trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, e.getMessage());
trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "false");
return null;
}
}
Expand Down Expand Up @@ -418,6 +438,11 @@ public void initialize()
response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult());
response.setContentType(contentType.toString());

if (response instanceof org.eclipse.jetty.server.Response) {
org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response;
jettyResponse.setTrailers(() -> trailerFields);
}

try {
out = new CountingOutputStream(response.getOutputStream());
writer = resultsWriter.makeWriter(out);
Expand Down
159 changes: 157 additions & 2 deletions server/src/test/java/org/apache/druid/server/QueryResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BadJsonQueryException;
Expand All @@ -65,6 +70,7 @@
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.mocks.ExceptionalInputStream;
import org.apache.druid.server.mocks.MockAsyncContext;
import org.apache.druid.server.mocks.MockHttpServletRequest;
import org.apache.druid.server.mocks.MockHttpServletResponse;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
Expand All @@ -81,6 +87,11 @@
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.http.HttpStatus;
import org.easymock.EasyMock;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpOutput;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
Expand Down Expand Up @@ -258,8 +269,8 @@ public void testGoodQuery() throws IOException
@Test
public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException
{
String overrideConfigKey = "priority";
String overrideConfigValue = "678";
final String overrideConfigKey = "priority";
final String overrideConfigValue = "678";
DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue));
queryResource = new QueryResource(
new QueryLifecycleFactory(
Expand Down Expand Up @@ -381,6 +392,125 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(
);
}

@Test
public void testResponseWithIncludeTrailerHeader() throws IOException
{
queryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
new QuerySegmentWalker()
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
Query<T> query,
Iterable<Interval> intervals
)
{
return (queryPlus, responseContext) -> new Sequence<T>() {
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) {
try {
((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized
}
catch (IOException ignore) {
}
}

throw new QueryTimeoutException();
}

@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
return Yielders.done(initValue, null);
}
};
}

@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
Query<T> query,
Iterable<SegmentDescriptor> specs
)
{
throw new UnsupportedOperationException();
}
},
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
jsonMapper,
smileMapper,
queryScheduler,
new AuthConfig(),
null,
ResponseContextConfig.newConfig(true),
DRUID_NODE
);

expectPermissiveHappyPathAuth();

org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest);
Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
null /*pretty*/,
testServletRequest));
Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));
Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), QueryResultPusher.RESULT_TRAILER_HEADERS);

final HttpFields fields = response.getTrailers().get();
Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));
Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER),
"Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.");

Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "false");
}

@Test
public void testSuccessResponseWithTrailerHeader() throws IOException
{
queryResource = new QueryResource(
new QueryLifecycleFactory(
WAREHOUSE,
TEST_SEGMENT_WALKER,
new DefaultGenericQueryMetricsFactory(),
new NoopServiceEmitter(),
testRequestLogger,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
),
jsonMapper,
smileMapper,
queryScheduler,
new AuthConfig(),
null,
ResponseContextConfig.newConfig(true),
DRUID_NODE
);

expectPermissiveHappyPathAuth();

org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest);
Assert.assertNull(queryResource.doPost(new ByteArrayInputStream(
SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
null /*pretty*/,
testServletRequest));
Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString()));

final HttpFields fields = response.getTrailers().get();
Assert.assertFalse(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER));

Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER));
Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "true");
}

@Test
public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws IOException
Expand Down Expand Up @@ -1459,4 +1589,29 @@ private Response expectSynchronousRequestFlow(
{
return queryResource.doPost(new ByteArrayInputStream(bytes), null, req);
}

private org.eclipse.jetty.server.Response jettyResponseforRequest(MockHttpServletRequest req) throws IOException
{
HttpChannel channelMock = EasyMock.mock(HttpChannel.class);
HttpOutput outputMock = EasyMock.mock(HttpOutput.class);
org.eclipse.jetty.server.Response response = new org.eclipse.jetty.server.Response(channelMock, outputMock);

EasyMock.expect(channelMock.isSendError()).andReturn(false);
EasyMock.expect(channelMock.isCommitted()).andReturn(true);

outputMock.close();
EasyMock.expectLastCall().andVoid();

outputMock.write(EasyMock.anyObject(byte[].class), EasyMock.anyInt(), EasyMock.anyInt());
EasyMock.expectLastCall().andVoid();

EasyMock.replay(outputMock, channelMock);

req.newAsyncContext(() -> {
final MockAsyncContext retVal = new MockAsyncContext();
retVal.response = response;
return retVal;
});
return response;
}
}

0 comments on commit df680ba

Please sign in to comment.