diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoder.java index 26c4aeb00ca0..b2e5e1aa6d77 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoder.java @@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; -import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; @@ -87,17 +86,8 @@ public GenericRecord parse(ByteBuffer bytes) try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } - catch (EOFException eof) { - // waiting for avro v1.9.0 (#AVRO-813) - throw new ParseException( - null, - eof, - "Avro's unnecessary EOFException, detail: [%s]", - "https://issues.apache.org/jira/browse/AVRO-813" - ); - } catch (Exception e) { - throw new ParseException(null, e, "Fail to decode avro message!"); + throw new ParseException(null, e, "Failed to read Avro message"); } } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java index 2dabd25daffd..5e2a9f2cd597 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java @@ -36,7 +36,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; -import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; @@ -101,35 +100,26 @@ public Map> getSchemas() public GenericRecord parse(ByteBuffer bytes) { if (bytes.remaining() < 5) { - throw new ParseException(null, "record must have at least 5 bytes carrying version and schemaId"); + throw new ParseException(null, "Record must have at least 5 bytes carrying version and schemaId"); } byte version = bytes.get(); if (version != V1) { - throw new ParseException(null, "found record of arbitrary version [%s]", version); + throw new ParseException(null, "Found record of arbitrary version[%s]", version); } int schemaId = bytes.getInt(); Schema schemaObj = schemaObjs.get(schemaId); if (schemaObj == null) { - throw new ParseException(null, "Failed to find schema for id [%s]", schemaId); + throw new ParseException(null, "Failed to find schema for id[%s]", schemaId); } DatumReader reader = new GenericDatumReader<>(schemaObj); try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } - catch (EOFException eof) { - // waiting for avro v1.9.0 (#AVRO-813) - throw new ParseException( - null, - eof, - "Avro's unnecessary EOFException, detail: [%s]", - "https://issues.apache.org/jira/browse/AVRO-813" - ); - } catch (Exception e) { - throw new ParseException(null, e, "Fail to decode avro message with schemaId [%s].", schemaId); + throw new ParseException(null, e, "Failed to read Avro message with schema id[%s]", schemaId); } } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java index 46b836f22f8c..001cb806332e 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java @@ -34,8 +34,6 @@ import org.schemarepo.api.TypedSchemaRepository; import org.schemarepo.api.converter.AvroSchemaConverter; -import java.io.EOFException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Objects; @@ -83,17 +81,8 @@ public GenericRecord parse(ByteBuffer bytes) try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } - catch (EOFException eof) { - // waiting for avro v1.9.0 (#AVRO-813) - throw new ParseException( - null, - eof, - "Avro's unnecessary EOFException, detail: [%s]", - "https://issues.apache.org/jira/browse/AVRO-813" - ); - } - catch (IOException e) { - throw new ParseException(null, e, "Fail to decode avro message!"); + catch (Exception e) { + throw new ParseException(null, e, "Failed to read Avro message"); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java index 100557b3b9e9..f0cfd372cb84 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; @@ -43,6 +44,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -299,6 +301,45 @@ public void testParseSchemaless() throws SchemaValidationException, IOException } } + @Test + public void testParseInvalidData() throws IOException, SchemaValidationException + { + Repository repository = new InMemoryRepository(null); + SchemaRepoBasedAvroBytesDecoder decoder = new SchemaRepoBasedAvroBytesDecoder<>( + new Avro1124SubjectAndIdConverter(TOPIC), + repository + ); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(20); + converter.putSubjectAndId(id, byteBuffer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(new byte[0]); + out.write(byteBuffer.array()); + + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> decoder.parse(ByteBuffer.wrap(out.toByteArray())) + ); + Assert.assertTrue(parseException.getCause() instanceof AvroRuntimeException); + Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message")); + } + static void assertInputRowCorrect(InputRow inputRow, List expectedDimensions, boolean isFromPigAvro) { Assert.assertEquals(expectedDimensions, inputRow.getDimensions()); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoderTest.java index 00b68b9a963a..19e88937750b 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemaAvroBytesDecoderTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; @@ -29,10 +30,12 @@ import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; import org.junit.Test; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; /** @@ -87,4 +90,50 @@ public void testParse() throws Exception GenericRecord actual = new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray())); Assert.assertEquals(someAvroDatum.get("id"), actual.get("id")); } + + @Test + public void testParseInvalidEncodedData() throws Exception + { + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + + // Encode data incorrectly + ByteBuffer byteBuffer = ByteBuffer.allocate(10); + byteBuffer.putInt(-1); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(new byte[0]); + out.write(byteBuffer.array()); + + DatumWriter writer = new SpecificDatumWriter<>(schema); + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray())) + ); + + Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message")); + Assert.assertTrue(parseException.getCause() instanceof IOException); + } + + @Test + public void testParseSmallInvalidChunk() throws Exception + { + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + + // Write a small chunk of data to trigger an AvroRuntimeException + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(ByteBuffer.allocate(20).array()); + + DatumWriter writer = new SpecificDatumWriter<>(schema); + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray())) + ); + Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message")); + Assert.assertTrue(parseException.getCause() instanceof AvroRuntimeException); + } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoderTest.java index 23e581f5bb15..873f51236607 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/InlineSchemasAvroBytesDecoderTest.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; import org.junit.Test; @@ -106,4 +107,77 @@ public void testParse() throws Exception ).parse(ByteBuffer.wrap(out.toByteArray())); Assert.assertEquals(someAvroDatum.get("id"), actual.get("id")); } + + @Test + public void testParseInvalidVersion() throws Exception + { + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + DatumWriter writer = new SpecificDatumWriter<>(schema); + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> new InlineSchemasAvroBytesDecoder( + ImmutableMap.of( + 10, + schema + ) + ).parse(ByteBuffer.wrap(out.toByteArray())) + ); + Assert.assertTrue(parseException.getMessage().contains("Found record of arbitrary version")); + } + + @Test + public void testParseInvalidSchemaId() throws Exception + { + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(new byte[]{1}); + + DatumWriter writer = new SpecificDatumWriter<>(schema); + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> new InlineSchemasAvroBytesDecoder( + ImmutableMap.of( + 10, + schema + ) + ).parse(ByteBuffer.wrap(out.toByteArray())) + ); + Assert.assertTrue(parseException.getMessage().contains("Failed to find schema for id")); + } + + @Test + public void testParseInvalidData() throws Exception + { + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(new byte[]{1}); + out.write(ByteBuffer.allocate(4).putInt(10).array()); + out.write(ByteBuffer.allocate(24).putInt(777).array()); // add some junk + + DatumWriter writer = new SpecificDatumWriter<>(schema); + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + ParseException parseException = Assert.assertThrows( + ParseException.class, + () -> new InlineSchemasAvroBytesDecoder( + ImmutableMap.of( + 10, + schema + ) + ).parse(ByteBuffer.wrap(out.toByteArray())) + ); + Assert.assertTrue(parseException.getMessage().contains("Failed to read Avro message with schema id[10]")); + } }