From df680bab05de46e46c5b3ec89a42648685456702 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Sat, 21 Sep 2024 01:59:37 -0700 Subject: [PATCH] Introduced `includeTrailerHeader` to enable `TrailerHeaders` in response (#16672) Introduced includeTrailerHeader to enable TrailerHeaders in response If enabled, a header X-Error-Message will be added to indicate reasons for partial results. --- .../apache/druid/server/QueryResource.java | 2 + .../druid/server/QueryResultPusher.java | 27 ++- .../druid/server/QueryResourceTest.java | 159 +++++++++++++++++- 3 files changed, 185 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 61696dd5cec3..06104000b1ca 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -94,6 +94,8 @@ public class QueryResource implements QueryCountStatsProvider public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context"; public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id"; + public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message"; + public static final String RESPONSE_COMPLETE_TRAILER_HEADER = "X-Druid-Response-Complete"; public static final String HEADER_ETAG = "ETag"; protected final QueryLifecycleFactory queryLifecycleFactory; diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index 074beb545b43..710c8ccc9199 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -38,6 +38,8 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.ForbiddenException; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import javax.annotation.Nullable; import javax.servlet.AsyncContext; @@ -54,6 +56,7 @@ public abstract class QueryResultPusher { private static final Logger log = new Logger(QueryResultPusher.class); + protected static final String RESULT_TRAILER_HEADERS = QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER; private final HttpServletRequest request; private final String queryId; @@ -63,6 +66,7 @@ public abstract class QueryResultPusher private final QueryResource.QueryMetricCounter counter; private final MediaType contentType; private final Map extraHeaders; + private final HttpFields trailerFields; private StreamingHttpResponseAccumulator accumulator; private AsyncContext asyncContext; @@ -87,6 +91,7 @@ public QueryResultPusher( this.counter = counter; this.contentType = contentType; this.extraHeaders = extraHeaders; + this.trailerFields = new HttpFields(); } /** @@ -120,7 +125,9 @@ public Response push() final Response.ResponseBuilder startResponse = resultsWriter.start(); if (startResponse != null) { - startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId); + startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId) + .header(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS); + for (Map.Entry entry : extraHeaders.entrySet()) { startResponse.header(entry.getKey(), entry.getValue()); } @@ -143,6 +150,17 @@ public Response push() response.setHeader(entry.getKey(), entry.getValue()); } + if (response instanceof org.eclipse.jetty.server.Response) { + org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; + + jettyResponse.setHeader(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS); + jettyResponse.setTrailers(() -> trailerFields); + + // Start with complete status + + trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "true"); + } + accumulator = new StreamingHttpResponseAccumulator(queryResponse.getResponseContext(), resultsWriter); results.accumulate(null, accumulator); @@ -223,6 +241,8 @@ private Response handleDruidException(ResultsWriter resultsWriter, DruidExceptio // also throwing the exception body into the response to make it easier for the client to choke if it manages // to parse a meaningful object out, but that's potentially an API change so we leave that as an exercise for // the future. + trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, e.getMessage()); + trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "false"); return null; } } @@ -418,6 +438,11 @@ public void initialize() response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult()); response.setContentType(contentType.toString()); + if (response instanceof org.eclipse.jetty.server.Response) { + org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; + jettyResponse.setTrailers(() -> trailerFields); + } + try { out = new CountingOutputStream(response.getOutputStream()); writer = resultsWriter.makeWriter(out); diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 32c26edffee1..4d827a008f3f 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -39,8 +39,13 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BadJsonQueryException; @@ -65,6 +70,7 @@ import org.apache.druid.server.log.TestRequestLogger; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.mocks.ExceptionalInputStream; +import org.apache.druid.server.mocks.MockAsyncContext; import org.apache.druid.server.mocks.MockHttpServletRequest; import org.apache.druid.server.mocks.MockHttpServletResponse; import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; @@ -81,6 +87,11 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.http.HttpStatus; +import org.easymock.EasyMock; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpOutput; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; @@ -258,8 +269,8 @@ public void testGoodQuery() throws IOException @Test public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException { - String overrideConfigKey = "priority"; - String overrideConfigValue = "678"; + final String overrideConfigKey = "priority"; + final String overrideConfigValue = "678"; DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue)); queryResource = new QueryResource( new QueryLifecycleFactory( @@ -381,6 +392,125 @@ public QueryRunner getQueryRunnerForSegments( ); } + @Test + public void testResponseWithIncludeTrailerHeader() throws IOException + { + queryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, + Iterable intervals + ) + { + return (queryPlus, responseContext) -> new Sequence() { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { + try { + ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized + } + catch (IOException ignore) { + } + } + + throw new QueryTimeoutException(); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + return Yielders.done(initValue, null); + } + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, + Iterable specs + ) + { + throw new UnsupportedOperationException(); + } + }, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) + ), + jsonMapper, + smileMapper, + queryScheduler, + new AuthConfig(), + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE + ); + + expectPermissiveHappyPathAuth(); + + org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest); + Assert.assertNull(queryResource.doPost(new ByteArrayInputStream( + SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest)); + Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString())); + Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), QueryResultPusher.RESULT_TRAILER_HEADERS); + + final HttpFields fields = response.getTrailers().get(); + Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER), + "Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query."); + + Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "false"); + } + + @Test + public void testSuccessResponseWithTrailerHeader() throws IOException + { + queryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + TEST_SEGMENT_WALKER, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) + ), + jsonMapper, + smileMapper, + queryScheduler, + new AuthConfig(), + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE + ); + + expectPermissiveHappyPathAuth(); + + org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest); + Assert.assertNull(queryResource.doPost(new ByteArrayInputStream( + SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest)); + Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString())); + + final HttpFields fields = response.getTrailers().get(); + Assert.assertFalse(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER)); + + Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "true"); + } @Test public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws IOException @@ -1459,4 +1589,29 @@ private Response expectSynchronousRequestFlow( { return queryResource.doPost(new ByteArrayInputStream(bytes), null, req); } + + private org.eclipse.jetty.server.Response jettyResponseforRequest(MockHttpServletRequest req) throws IOException + { + HttpChannel channelMock = EasyMock.mock(HttpChannel.class); + HttpOutput outputMock = EasyMock.mock(HttpOutput.class); + org.eclipse.jetty.server.Response response = new org.eclipse.jetty.server.Response(channelMock, outputMock); + + EasyMock.expect(channelMock.isSendError()).andReturn(false); + EasyMock.expect(channelMock.isCommitted()).andReturn(true); + + outputMock.close(); + EasyMock.expectLastCall().andVoid(); + + outputMock.write(EasyMock.anyObject(byte[].class), EasyMock.anyInt(), EasyMock.anyInt()); + EasyMock.expectLastCall().andVoid(); + + EasyMock.replay(outputMock, channelMock); + + req.newAsyncContext(() -> { + final MockAsyncContext retVal = new MockAsyncContext(); + retVal.response = response; + return retVal; + }); + return response; + } }