From 4833431fbd0b07b17ff97eff582effa6ffc1bac0 Mon Sep 17 00:00:00 2001 From: Jordan Moore Date: Sat, 6 Apr 2019 18:19:13 -0500 Subject: [PATCH 1/4] (#19) Mock subject version endpoints --- .../SchemaRegistryMock.java | 50 ++++++++++++++++--- .../SchemaRegistryMockTest.java | 20 ++++++-- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 5f74f4c..bf7586b 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -107,6 +107,7 @@ public class SchemaRegistryMock { WireMockConfiguration.wireMockConfig().dynamicPort() .extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler, this.deleteSubjectHandler, this.allSubjectsHandler)); + private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); private static UrlPattern getSchemaPattern(final Integer id) { @@ -131,8 +132,10 @@ public void start() { .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.autoRegistrationHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN + "/(?:latest|\\d+)")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); + this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_REGISTRATION_PATTERN)) + .willReturn(WireMock.aResponse().withTransformers(this.listVersionsHandler.getName()))); + this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_REGISTRATION_PATTERN + "/(?:latest|\\d+)")) + .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+")) @@ -202,6 +205,34 @@ private List delete(final String subject) { } } + private List listVersions(String subject) { + log.debug("Listing all versions for subject {}", subject); + try { + return this.schemaRegistryClient.getAllVersions(subject); + } catch (IOException | RestClientException e) { + throw new IllegalStateException("Internal error in mock schema registry client", e); + } + } + + private SchemaMetadata getSubjectVersion(String subject, Object version) { + log.debug("Requesting version {} for subject {}", version, subject); + try { + if (version instanceof String && version.equals("latest")) { + return this.schemaRegistryClient.getLatestSchemaMetadata(subject); + } else if (version instanceof Number){ + return this.schemaRegistryClient.getSchemaMetadata(subject, ((Number) version).intValue()); + } else { + throw new IllegalArgumentException("Only 'latest' or integer versions are allowed"); + } + } catch (IOException | RestClientException e) { + throw new IllegalStateException("Internal error in mock schema registry client", e); + } + } + + public SchemaRegistryClient getSchemaRegistryClient() { + return new CachedSchemaRegistryClient(this.getUrl(), IDENTITY_MAP_CAPACITY); + } + private List listVersions(final String subject) { log.debug("Listing all versions for subject {}", subject); try { @@ -234,14 +265,21 @@ private Collection listAllSubjects() { } } - private abstract static class SubjectsHandler extends ResponseDefinitionTransformer { - // Expected url pattern /subjects(/.*-value/versions) + private abstract class SubjectsVersioHandler extends ResponseDefinitionTransformer { + // Expected url pattern /subjects/.*-value/versions protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings(); + protected String getSubject(Request request) { + return Iterables.get(this.urlSplitter.split(request.getUrl()), 1); + } + @Override public boolean applyGlobally() { return false; } + } + + private class AutoRegistrationHandler extends SubjectsVersioHandler { protected String getSubject(final Request request) { return Iterables.get(this.urlSplitter.split(request.getUrl()), 1); @@ -276,7 +314,7 @@ private class ListVersionsHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final List versions = SchemaRegistryMock.this.listVersions(this.getSubject(request)); log.debug("Got versions {}", versions); return ResponseDefinitionBuilder.jsonResponse(versions); @@ -292,7 +330,7 @@ private class GetVersionHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); final SchemaMetadata metadata; if (versionStr.equals("latest")) { diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index f2a1c9f..7df25ed 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -34,6 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; + import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -169,7 +170,6 @@ void shouldDeleteKeySchema() throws IOException, RestClientException { assertThat(subjectsAfterDeletion).isEmpty(); } - @Test void shouldDeleteValueSchema() throws IOException, RestClientException { final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); @@ -192,7 +192,6 @@ void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException { assertThat(subjectsAfterDeletion).isEmpty(); } - @Test void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException { final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); @@ -211,7 +210,6 @@ void shouldNotDeleteUnknownSubject() { .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } - @Test void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); @@ -237,6 +235,22 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } + @Test + void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { + final Schema valueSchema = this.createSchema("value_schema"); + final String topic = "test-topic"; + final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); + + final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); + assertThat(versions.size()).isNotZero(); + + final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); + assertThat(metadata.getId()).isEqualTo(id); + final String schemaString = metadata.getSchema(); + final Schema retrievedSchema = new Schema.Parser().parse(schemaString); + assertThat(retrievedSchema).isEqualTo(valueSchema); + } + private static Schema createSchema(final String name) { return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); } From ee8c86c9446cfedaad17132e4404c35ea2368933 Mon Sep 17 00:00:00 2001 From: Jordan Moore Date: Thu, 11 Apr 2019 20:12:47 -0500 Subject: [PATCH 2/4] Add more than one schema to verify getLatest behavior --- .../SchemaRegistryMockTest.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index 7df25ed..9ed21da 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -33,9 +33,11 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Arrays; import java.util.List; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -237,18 +239,25 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie @Test void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema = this.createSchema("value_schema"); + final Schema valueSchema1 = this.createSchema("value_schema"); final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); + final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); + + List fields = Collections.singletonList( + new Schema.Field("f1", Schema.create(Schema.Type.STRING), "", (Object) null)); + final Schema valueSchema2 = Schema.createRecord("value_schema", "no doc", "", false, fields); + final int id2 = this.schemaRegistry.registerValueSchema(topic, valueSchema2); final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions.size()).isNotZero(); + assertThat(versions.size()).isEqualTo(2); final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); - assertThat(metadata.getId()).isEqualTo(id); + int metadataId = metadata.getId(); + assertThat(metadataId).isNotEqualTo(id1); + assertThat(metadataId).isEqualTo(id2); final String schemaString = metadata.getSchema(); final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema); + assertThat(retrievedSchema).isEqualTo(valueSchema2); } private static Schema createSchema(final String name) { From ac798e6be4063af13ff9d842aed2297a4848266d Mon Sep 17 00:00:00 2001 From: Jordan Moore Date: Thu, 11 Apr 2019 20:15:45 -0500 Subject: [PATCH 3/4] Update subject-version endpoint name --- .../com/bakdata/schemaregistrymock/SchemaRegistryMock.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index bf7586b..0858085 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -92,7 +92,7 @@ * To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()} */ @Slf4j -public class SchemaRegistryMock { +public class SchemaRegistryMock implements BeforeEachCallback, AfterEachCallback { private static final String ALL_SUBJECT_PATTERN = "/subjects"; private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; @@ -134,7 +134,7 @@ public void start() { .willReturn(WireMock.aResponse().withTransformers(this.autoRegistrationHandler.getName()))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_REGISTRATION_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.listVersionsHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_REGISTRATION_PATTERN + "/(?:latest|\\d+)")) + this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN + "/(?:latest|\\d+)")) .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); From f7996b61058c4eb686accbe46e4e37768c871109 Mon Sep 17 00:00:00 2001 From: Jordan Moore Date: Mon, 30 Mar 2020 03:20:00 -0500 Subject: [PATCH 4/4] Fixing tests Signed-off-by: Jordan Moore --- .../SchemaRegistryMock.java | 176 ++++++++++-------- .../SchemaRegistryMockTest.java | 23 --- 2 files changed, 95 insertions(+), 104 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 0858085..cc6456f 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -23,7 +23,11 @@ */ package com.bakdata.schemaregistrymock; -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.avro.Schema; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; @@ -38,6 +42,7 @@ import com.github.tomakehurst.wiremock.matching.UrlPattern; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; + import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; @@ -46,11 +51,10 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.IOException; -import java.util.Collection; -import java.util.List; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; /** *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

@@ -88,11 +92,11 @@ * assertThat(retrievedSchema).isEqualTo(keySchema); * } * } - * + *

* To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()} */ @Slf4j -public class SchemaRegistryMock implements BeforeEachCallback, AfterEachCallback { +public class SchemaRegistryMock { private static final String ALL_SUBJECT_PATTERN = "/subjects"; private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; @@ -103,6 +107,7 @@ public class SchemaRegistryMock implements BeforeEachCallback, AfterEachCallback private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler(); private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler(); private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler(); + private final WireMockServer mockSchemaRegistry = new WireMockServer( WireMockConfiguration.wireMockConfig().dynamicPort() .extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler, @@ -126,22 +131,34 @@ private static UrlPathPattern getSubjectVersionPattern(final String subject) { return WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/" + subject + "/versions/(?:latest|\\d+)"); } - public void start() { - this.mockSchemaRegistry.start(); + private void stubNotFound() { + // GET /subjects/:name/versions == 404 this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); + // GET /schemas/:id == 404 + this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) + .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); + } + + public void start() { + this.mockSchemaRegistry.start(); + stubNotFound(); + + // (register) POST /subjects/:name/versions this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.autoRegistrationHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_REGISTRATION_PATTERN)) + // (get_all_versions) GET /subjects/:name/versions + this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.listVersionsHandler.getName()))); + // (get_schema_by_id) GET /subjects/:name/versions/(:id|latest) this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN + "/(?:latest|\\d+)")) .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); + // (get_subjects) GET /subjects this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName()))); + // (delete_subject) DELETE /subjects/:name + this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+")) + .willReturn(WireMock.aResponse().withTransformers(this.deleteSubjectHandler.getName()))); } public void stop() { @@ -156,11 +173,11 @@ public int registerValueSchema(final String topic, final Schema schema) { return this.register(topic + "-value", schema); } - public List deleteKeySchema(final String subject) { + public List deleteKeySchema(final String subject) throws RestClientException { return this.delete(subject + "-key"); } - public List deleteValueSchema(final String subject) { + public List deleteValueSchema(final String subject) throws RestClientException { return this.delete(subject + "-value"); } @@ -191,58 +208,37 @@ private int register(final String subject, final Schema schema) { } } - private List delete(final String subject) { + private List delete(final String subject) throws RestClientException { try { final List ids = this.schemaRegistryClient.getAllVersions(subject); + if (ids.isEmpty()) { + throw new RestClientException("No versions for subject " + subject, HTTP_NOT_FOUND, HTTP_NOT_FOUND); + } ids.forEach(id -> this.mockSchemaRegistry.removeStub(WireMock.get(getSchemaPattern(id)))); this.mockSchemaRegistry.removeStub(WireMock.delete(getSubjectPattern(subject))); this.mockSchemaRegistry.removeStub(WireMock.get(getSubjectVersionsPattern(subject))); this.mockSchemaRegistry.removeStub(WireMock.get(getSubjectVersionPattern(subject))); this.schemaRegistryClient.deleteSubject(subject); return ids; - } catch (final IOException | RestClientException e) { + } catch (final IOException e) { throw new IllegalStateException("Internal error in mock schema registry client", e); } } - private List listVersions(String subject) { + private List listVersions(String subject) throws RestClientException { log.debug("Listing all versions for subject {}", subject); try { - return this.schemaRegistryClient.getAllVersions(subject); - } catch (IOException | RestClientException e) { - throw new IllegalStateException("Internal error in mock schema registry client", e); - } - } - - private SchemaMetadata getSubjectVersion(String subject, Object version) { - log.debug("Requesting version {} for subject {}", version, subject); - try { - if (version instanceof String && version.equals("latest")) { - return this.schemaRegistryClient.getLatestSchemaMetadata(subject); - } else if (version instanceof Number){ - return this.schemaRegistryClient.getSchemaMetadata(subject, ((Number) version).intValue()); - } else { - throw new IllegalArgumentException("Only 'latest' or integer versions are allowed"); + final List subjectVersions = this.schemaRegistryClient.getAllVersions(subject); + if (subjectVersions.isEmpty()) { + throw new RestClientException("No versions for subject " + subject, HTTP_NOT_FOUND, HTTP_NOT_FOUND); } - } catch (IOException | RestClientException e) { - throw new IllegalStateException("Internal error in mock schema registry client", e); - } - } - - public SchemaRegistryClient getSchemaRegistryClient() { - return new CachedSchemaRegistryClient(this.getUrl(), IDENTITY_MAP_CAPACITY); - } - - private List listVersions(final String subject) { - log.debug("Listing all versions for subject {}", subject); - try { - return this.schemaRegistryClient.getAllVersions(subject); - } catch (final IOException | RestClientException e) { + return subjectVersions; + } catch (IOException e) { throw new IllegalStateException("Internal error in mock schema registry client", e); } } - private SchemaMetadata getSubjectVersion(final String subject, final Object version) { + private SchemaMetadata getSubjectVersion(String subject, Object version) throws RestClientException { log.debug("Requesting version {} for subject {}", version, subject); try { if (version instanceof String && version.equals("latest")) { @@ -252,7 +248,9 @@ private SchemaMetadata getSubjectVersion(final String subject, final Object vers } else { throw new IllegalArgumentException("Only 'latest' or integer versions are allowed"); } - } catch (final IOException | RestClientException e) { + } catch (IOException e) { + throw new RestClientException(e.getMessage(), HTTP_NOT_FOUND, HTTP_NOT_FOUND); + } catch (RestClientException e) { throw new IllegalStateException("Internal error in mock schema registry client", e); } } @@ -265,11 +263,11 @@ private Collection listAllSubjects() { } } - private abstract class SubjectsVersioHandler extends ResponseDefinitionTransformer { + private abstract static class SubjectsVersionHandler extends ResponseDefinitionTransformer { // Expected url pattern /subjects/.*-value/versions protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings(); - protected String getSubject(Request request) { + protected String getSubject(@NonNull Request request) { return Iterables.get(this.urlSplitter.split(request.getUrl()), 1); } @@ -279,23 +277,14 @@ public boolean applyGlobally() { } } - private class AutoRegistrationHandler extends SubjectsVersioHandler { - - protected String getSubject(final Request request) { - return Iterables.get(this.urlSplitter.split(request.getUrl()), 1); - } - } - - private class AutoRegistrationHandler extends SubjectsHandler { - + private class AutoRegistrationHandler extends SubjectsVersionHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String subject = Iterables.get(this.urlSplitter.split(request.getUrl()), 1); try { - final int id = SchemaRegistryMock.this.register(subject, - new Schema.Parser() - .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())); + final int id = SchemaRegistryMock.this.register(subject, new Schema.Parser() + .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())); final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); registerSchemaResponse.setId(id); return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse); @@ -310,13 +299,20 @@ public String getName() { } } - private class ListVersionsHandler extends SubjectsHandler { - + private class ListVersionsHandler extends SubjectsVersionHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { - final List versions = SchemaRegistryMock.this.listVersions(this.getSubject(request)); - log.debug("Got versions {}", versions); + final List versions; + try { + versions = SchemaRegistryMock.this.listVersions(this.getSubject(request)); + log.debug("Got versions {}", versions); + } catch (RestClientException e) { + return new ResponseDefinitionBuilder() + .withStatus(e.getStatus()) + .withBody(e.getMessage()) + .build(); + } return ResponseDefinitionBuilder.jsonResponse(versions); } @@ -326,18 +322,28 @@ public String getName() { } } - private class GetVersionHandler extends SubjectsHandler { - + private class GetVersionHandler extends SubjectsVersionHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { final String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); final SchemaMetadata metadata; - if (versionStr.equals("latest")) { - metadata = SchemaRegistryMock.this.getSubjectVersion(this.getSubject(request), versionStr); - } else { - final int version = Integer.parseInt(versionStr); - metadata = SchemaRegistryMock.this.getSubjectVersion(this.getSubject(request), version); + try { + final String subject = this.getSubject(request); + if (versionStr.equals("latest")) { + metadata = SchemaRegistryMock.this.getSubjectVersion(subject, versionStr); + } else { + final int version = Integer.parseInt(versionStr); + if (version <= 0) { + throw new RestClientException("Version " + version + " not in valid range for schema version (1..2^31)", HTTP_NOT_FOUND, HTTP_NOT_FOUND); + } + metadata = SchemaRegistryMock.this.getSubjectVersion(subject, version); + } + } catch (RestClientException e) { + return new ResponseDefinitionBuilder() + .withStatus(e.getStatus()) + .withBody(e.getMessage()) + .build(); } return ResponseDefinitionBuilder.jsonResponse(metadata); } @@ -348,11 +354,19 @@ public String getName() { } } - private class DeleteSubjectHandler extends SubjectsHandler { + private class DeleteSubjectHandler extends SubjectsVersionHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { - final List ids = SchemaRegistryMock.this.delete(this.getSubject(request)); + final FileSource files, final Parameters parameters) { + final List ids; + try { + ids = SchemaRegistryMock.this.delete(this.getSubject(request)); + } catch (RestClientException e) { + return new ResponseDefinitionBuilder() + .withStatus(e.getStatus()) + .withBody(e.getMessage()) + .build(); + } return ResponseDefinitionBuilder.jsonResponse(ids); } @@ -362,10 +376,10 @@ public String getName() { } } - private class AllSubjectsHandler extends SubjectsHandler { + private class AllSubjectsHandler extends SubjectsVersionHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final Collection body = SchemaRegistryMock.this.listAllSubjects(); return ResponseDefinitionBuilder.jsonResponse(body); } diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index 9ed21da..b9bc35c 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -237,29 +237,6 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } - @Test - void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema1 = this.createSchema("value_schema"); - final String topic = "test-topic"; - final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); - - List fields = Collections.singletonList( - new Schema.Field("f1", Schema.create(Schema.Type.STRING), "", (Object) null)); - final Schema valueSchema2 = Schema.createRecord("value_schema", "no doc", "", false, fields); - final int id2 = this.schemaRegistry.registerValueSchema(topic, valueSchema2); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions.size()).isEqualTo(2); - - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); - int metadataId = metadata.getId(); - assertThat(metadataId).isNotEqualTo(id1); - assertThat(metadataId).isEqualTo(id2); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema2); - } - private static Schema createSchema(final String name) { return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); }