Skip to content

Commit

Permalink
Better error handling when retrieving Avro schemas from registry (apa…
Browse files Browse the repository at this point in the history
…che#16684)

* Handle RestClientException separately, instead of returning a generic error.

- Add tests
- Clean up the tests; remove the legacy expected exception pattern
- Better test assertions

* Rename tests

* checkstyle fixes
  • Loading branch information
abhishekrb19 authored Jul 2, 2024
1 parent d65e015 commit 35b9709
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,34 @@ public GenericRecord parse(ByteBuffer bytes)
ParsedSchema parsedSchema = registry.getSchemaById(id);
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
}
catch (IOException | RestClientException ex) {
throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] from registry. Check if the schema "
+ "exists in the registry. Otherwise it could mean that there is "
+ "malformed data in the stream or data that doesn't conform to the schema "
+ "specified.", id);
catch (IOException ex1) {
throw new ParseException(
null,
ex1,
"Failed to fetch Avro schema id[%s] from registry. Check if the schema exists in the registry. Otherwise it"
+ " could mean that there is malformed data in the stream or data that doesn't conform to the schema"
+ " specified.",
id
);
}
catch (RestClientException ex2) {
if (ex2.getErrorCode() == 401) {
throw new ParseException(
null,
ex2,
"Failed to authenticate to schema registry for Avro schema id[%s]. Please check your credentials.",
id
);
}
// For all other errors, just include the code and message received from the library.
throw new ParseException(
null,
ex2,
"Failed to fetch Avro schema id[%s] from registry. Error code[%s] and message[%s].",
id,
ex2.getErrorCode(),
ex2.getMessage()
);
}
if (schema == null) {
throw new ParseException(null, "No Avro schema id[%s] in registry", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
Expand Down Expand Up @@ -61,15 +62,15 @@ public void setUp()
@Test
public void testMultipleUrls() throws Exception
{
// Given
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -78,15 +79,15 @@ public void testMultipleUrls() throws Exception
@Test
public void testUrl() throws Exception
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -95,15 +96,15 @@ public void testUrl() throws Exception
@Test
public void testConfig() throws Exception
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -120,21 +121,33 @@ public void testParse() throws Exception
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
bb.rewind();

// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
GenericRecord parse = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// Then
Assert.assertEquals(schema, parse.getSchema());
}

@Test(expected = ParseException.class)
@Test
public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
{
// Given
ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString("Failed to decode avro message, not enough bytes to decode (2)")
);
}

@Test(expected = ParseException.class)
@Test
public void testParseCorruptedPartial() throws Exception
{
// Given
Expand All @@ -145,19 +158,30 @@ public void testParseCorruptedPartial() throws Exception
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Failed to decode Avro message for schema id[1234]"));
}

@Test(expected = ParseException.class)
@Test
public void testParseWrongSchemaType() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No Avro schema id[1234] in registry"));
}

@Test
Expand All @@ -167,7 +191,8 @@ public void testParseWrongId() throws Exception
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
// When

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
Expand All @@ -187,17 +212,20 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I
@Test
public void testParseHeader() throws JsonProcessingException
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// When
Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(
decoder.getHeaders(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
new DefaultObjectMapper()
);

// Then
Assert.assertEquals(3, header.size());
Expand All @@ -209,22 +237,80 @@ public void testParseHeader() throws JsonProcessingException
@Test
public void testParseConfig() throws JsonProcessingException
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// When
Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(
decoder.getConfig(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
new DefaultObjectMapper()
);

// Then
Assert.assertEquals(3, config.size());
Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
}

@Test
public void testParseWhenUnauthenticatedException() throws IOException, RestClientException
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenThrow(new RestClientException("unauthenticated", 401, 401));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("unauthenticated"));
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString(
"Failed to authenticate to schema registry for Avro schema id[1234]. Please check your credentials"
)
);
}

@Test
public void testParseWhenResourceNotFoundException() throws IOException, RestClientException
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenThrow(new RestClientException("resource doesn't exist", 404, 404));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("resource doesn't exist"));
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString(
"Failed to fetch Avro schema id[1234] from registry."
+ " Error code[404] and message[resource doesn't exist; error code: 404]."
)
);
}
}

0 comments on commit 35b9709

Please sign in to comment.