diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 167b036e7d1f..c7c7438b1cdc 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -139,11 +139,34 @@ public GenericRecord parse(ByteBuffer bytes) ParsedSchema parsedSchema = registry.getSchemaById(id); schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; } - catch (IOException | RestClientException ex) { - throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] from registry. Check if the schema " - + "exists in the registry. Otherwise it could mean that there is " - + "malformed data in the stream or data that doesn't conform to the schema " - + "specified.", id); + catch (IOException ex1) { + throw new ParseException( + null, + ex1, + "Failed to fetch Avro schema id[%s] from registry. Check if the schema exists in the registry. Otherwise it" + + " could mean that there is malformed data in the stream or data that doesn't conform to the schema" + + " specified.", + id + ); + } + catch (RestClientException ex2) { + if (ex2.getErrorCode() == 401) { + throw new ParseException( + null, + ex2, + "Failed to authenticate to schema registry for Avro schema id[%s]. Please check your credentials.", + id + ); + } + // For all other errors, just include the code and message received from the library. + throw new ParseException( + null, + ex2, + "Failed to fetch Avro schema id[%s] from registry. Error code[%s] and message[%s].", + id, + ex2.getErrorCode(), + ex2.getMessage() + ); } if (schema == null) { throw new ParseException(null, "No Avro schema id[%s] in registry", id); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 7644b61bb106..2aad88e8ff58 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -25,6 +25,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; @@ -61,15 +62,15 @@ public void setUp() @Test public void testMultipleUrls() throws Exception { + // Given String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -78,15 +79,15 @@ public void testMultipleUrls() throws Exception @Test public void testUrl() throws Exception { + // Given String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -95,15 +96,15 @@ public void testUrl() throws Exception @Test public void testConfig() throws Exception { + // Given String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -120,21 +121,33 @@ public void testParse() throws Exception byte[] bytes = getAvroDatum(schema, someAvroDatum); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); + // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + GenericRecord parse = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // Then + Assert.assertEquals(schema, parse.getSchema()); } - @Test(expected = ParseException.class) + @Test public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo() { // Given ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString("Failed to decode avro message, not enough bytes to decode (2)") + ); } - @Test(expected = ParseException.class) + @Test public void testParseCorruptedPartial() throws Exception { // Given @@ -145,19 +158,30 @@ public void testParseCorruptedPartial() throws Exception byte[] bytes = getAvroDatum(schema, someAvroDatum); ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class)); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Failed to decode Avro message for schema id[1234]")); } - @Test(expected = ParseException.class) + @Test public void testParseWrongSchemaType() throws Exception { // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class)); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No Avro schema id[1234] in registry")); } @Test @@ -167,7 +191,8 @@ public void testParseWrongId() throws Exception Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); - // When + + // When / Then final ParseException e = Assert.assertThrows( ParseException.class, () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) @@ -187,17 +212,20 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I @Test public void testParseHeader() throws JsonProcessingException { + // Given String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); - Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + // When + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap( + decoder.getHeaders(), + SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, + new DefaultObjectMapper() + ); // Then Assert.assertEquals(3, header.size()); @@ -209,17 +237,20 @@ public void testParseHeader() throws JsonProcessingException @Test public void testParseConfig() throws JsonProcessingException { + // Given String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); - Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + // When + Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap( + decoder.getConfig(), + SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, + new DefaultObjectMapper() + ); // Then Assert.assertEquals(3, config.size()); @@ -227,4 +258,59 @@ public void testParseConfig() throws JsonProcessingException Assert.assertEquals("value.2", config.get("registry.config.prop.2")); Assert.assertEquals("value.3", config.get("registry.config.prop.3")); } + + @Test + public void testParseWhenUnauthenticatedException() throws IOException, RestClientException + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenThrow(new RestClientException("unauthenticated", 401, 401)); + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + byte[] bytes = getAvroDatum(schema, someAvroDatum); + ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); + bb.rewind(); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class)); + MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("unauthenticated")); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString( + "Failed to authenticate to schema registry for Avro schema id[1234]. Please check your credentials" + ) + ); + } + + @Test + public void testParseWhenResourceNotFoundException() throws IOException, RestClientException + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenThrow(new RestClientException("resource doesn't exist", 404, 404)); + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + byte[] bytes = getAvroDatum(schema, someAvroDatum); + ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); + bb.rewind(); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class)); + MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("resource doesn't exist")); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString( + "Failed to fetch Avro schema id[1234] from registry." + + " Error code[404] and message[resource doesn't exist; error code: 404]." + ) + ); + } }