Skip to content

Commit

Permalink
QueryResource: Don't close JSON content on error. (#17034) (#17303)
Browse files Browse the repository at this point in the history
* QueryResource: Don't close JSON content on error.

Following similar issues fixed in #11685 and #15880, this patch fixes
a bug where QueryResource would write a closing array marker if it
encountered an exception after starting to push results. This makes it
difficult for callers to detect errors.

The prior patches didn't catch this problem because QueryResource uses
the ObjectMapper in a unique way, through writeValuesAsArray, which
doesn't respect the global AUTO_CLOSE_JSON_CONTENT setting.

* Fix usage of customized ObjectMappers.

Co-authored-by: Gian Merlino <[email protected]>
  • Loading branch information
AmatyaAvadhanula and gianm authored Oct 9, 2024
1 parent 4ed6cde commit 9b90d9c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.server;

import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -62,7 +62,6 @@

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -434,7 +433,7 @@ private boolean isSerializeDateTimeAsLong()
|| (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false));
}

public ObjectWriter newOutputWriter(ResourceIOReaderWriter ioReaderWriter)
public ObjectMapper newOutputWriter(ResourceIOReaderWriter ioReaderWriter)
{
return ioReaderWriter.getResponseWriter().newOutputWriter(
getToolChest(),
Expand Down
85 changes: 50 additions & 35 deletions server/src/main/java/org/apache/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.apache.druid.server;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BadJsonQueryException;
import org.apache.druid.query.Query;
Expand Down Expand Up @@ -374,7 +376,7 @@ String getResponseType()
return responseType;
}

ObjectWriter newOutputWriter(
ObjectMapper newOutputWriter(
@Nullable QueryToolChest<?, Query<?>> toolChest,
@Nullable Query<?> query,
boolean serializeDateTimeAsLong
Expand All @@ -387,7 +389,7 @@ ObjectWriter newOutputWriter(
} else {
decoratedMapper = mapper;
}
return isPretty ? decoratedMapper.writerWithDefaultPrettyPrinter() : decoratedMapper.writer();
return isPretty ? decoratedMapper.copy().enable(SerializationFeature.INDENT_OUTPUT) : decoratedMapper;
}

Response ok(Object object) throws IOException
Expand Down Expand Up @@ -531,35 +533,7 @@ public QueryResponse<Object> getQueryResponse()
@Override
public Writer makeWriter(OutputStream out) throws IOException
{
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io);
final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out);
return new Writer()
{

@Override
public void writeResponseStart()
{
// Do nothing
}

@Override
public void writeRow(Object obj) throws IOException
{
sequenceWriter.write(obj);
}

@Override
public void writeResponseEnd()
{
// Do nothing
}

@Override
public void close() throws IOException
{
sequenceWriter.close();
}
};
return new NativeQueryWriter(queryLifecycle.newOutputWriter(io), out);
}

@Override
Expand All @@ -585,8 +559,49 @@ public void close()
@Override
public void writeException(Exception e, OutputStream out) throws IOException
{
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io);
out.write(objectWriter.writeValueAsBytes(e));
final ObjectMapper objectMapper = queryLifecycle.newOutputWriter(io);
out.write(objectMapper.writeValueAsBytes(e));
}
}

static class NativeQueryWriter implements QueryResultPusher.Writer
{
private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;

public NativeQueryWriter(final ObjectMapper responseMapper, final OutputStream out) throws IOException
{
// Don't use objectWriter.writeValuesAsArray(out), because that causes an end array ] to be written when the
// writer is closed, even if it's closed in case of an exception. This causes valid JSON to be emitted in case
// of an exception, which makes it difficult for callers to detect problems. Note: this means that if an error
// occurs on a Historical (or other data server) after it started to push results to the Broker, the Broker
// will experience that as "JsonEOFException: Unexpected end-of-input: expected close marker for Array".
this.serializers = responseMapper.getSerializerProviderInstance();
this.jsonGenerator = responseMapper.createGenerator(out);
}

@Override
public void writeResponseStart() throws IOException
{
jsonGenerator.writeStartArray();
}

@Override
public void writeRow(Object obj) throws IOException
{
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, obj);
}

@Override
public void writeResponseEnd() throws IOException
{
jsonGenerator.writeEndArray();
}

@Override
public void close() throws IOException
{
jsonGenerator.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
Expand Down Expand Up @@ -80,12 +81,14 @@
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.http.HttpStatus;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -98,6 +101,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -424,7 +428,8 @@ public QueryLifecycle factorize()
overrideConfig,
new AuthConfig(),
System.currentTimeMillis(),
System.nanoTime())
System.nanoTime()
)
{
@Override
public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten)
Expand Down Expand Up @@ -453,7 +458,8 @@ public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAdd
entity.getUnderlyingException(),
new DruidExceptionMatcher(
DruidException.Persona.OPERATOR,
DruidException.Category.RUNTIME_FAILURE, "legacyQueryException")
DruidException.Category.RUNTIME_FAILURE, "legacyQueryException"
)
.expectMessageIs("something")
);
}
Expand Down Expand Up @@ -1250,6 +1256,46 @@ public void testTooManyQueryInLaneImplicitFromDurationThreshold() throws Interru
}
}

@Test
public void testNativeQueryWriter_goodResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.writeRow(Collections.singletonList("baz"));
writer.writeResponseEnd();
writer.close();

Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
),
jsonMapper.readValue(baos.toByteArray(), Object.class)
);
}

@Test
public void testNativeQueryWriter_truncatedResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.close(); // Simulate an error that occurs midstream; close writer without calling writeResponseEnd.

final JsonProcessingException e = Assert.assertThrows(
JsonProcessingException.class,
() -> jsonMapper.readValue(baos.toByteArray(), Object.class)
);

MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("expected close marker for Array"))
);
}

private void createScheduledQueryResource(
QueryScheduler scheduler,
Collection<CountDownLatch> beforeScheduler,
Expand Down

0 comments on commit 9b90d9c

Please sign in to comment.