Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] List get versions #39

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
*/
package com.bakdata.schemaregistrymock;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

import org.apache.avro.Schema;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
Expand All @@ -38,6 +42,7 @@
import com.github.tomakehurst.wiremock.matching.UrlPattern;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
Expand All @@ -46,11 +51,10 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

/**
* <p>The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.</p>
Expand Down Expand Up @@ -88,7 +92,7 @@
* assertThat(retrievedSchema).isEqualTo(keySchema);
* }
* }</code></pre>
*
* <p>
* To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()}
*/
@Slf4j
Expand All @@ -103,10 +107,12 @@ public class SchemaRegistryMock {
private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler();
private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler();
private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler();

private final WireMockServer mockSchemaRegistry = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort()
.extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler,
this.deleteSubjectHandler, this.allSubjectsHandler));

private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

private static UrlPattern getSchemaPattern(final Integer id) {
Expand All @@ -125,20 +131,34 @@ private static UrlPathPattern getSubjectVersionPattern(final String subject) {
return WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/" + subject + "/versions/(?:latest|\\d+)");
}

public void start() {
this.mockSchemaRegistry.start();
private void stubNotFound() {
// GET /subjects/:name/versions == 404
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
// GET /schemas/:id == 404
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
}

public void start() {
this.mockSchemaRegistry.start();
stubNotFound();

// (register) POST /subjects/:name/versions
this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.autoRegistrationHandler.getName())));
// (get_all_versions) GET /subjects/:name/versions
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.listVersionsHandler.getName())));
// (get_schema_by_id) GET /subjects/:name/versions/(:id|latest)
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN + "/(?:latest|\\d+)"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
.willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName())));
// (get_subjects) GET /subjects
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName())));
// (delete_subject) DELETE /subjects/:name
this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+"))
.willReturn(WireMock.aResponse().withTransformers(this.deleteSubjectHandler.getName())));
}

public void stop() {
Expand All @@ -153,11 +173,11 @@ public int registerValueSchema(final String topic, final Schema schema) {
return this.register(topic + "-value", schema);
}

public List<Integer> deleteKeySchema(final String subject) {
public List<Integer> deleteKeySchema(final String subject) throws RestClientException {
return this.delete(subject + "-key");
}

public List<Integer> deleteValueSchema(final String subject) {
public List<Integer> deleteValueSchema(final String subject) throws RestClientException {
return this.delete(subject + "-value");
}

Expand Down Expand Up @@ -188,30 +208,37 @@ private int register(final String subject, final Schema schema) {
}
}

private List<Integer> delete(final String subject) {
private List<Integer> delete(final String subject) throws RestClientException {
try {
final List<Integer> ids = this.schemaRegistryClient.getAllVersions(subject);
if (ids.isEmpty()) {
throw new RestClientException("No versions for subject " + subject, HTTP_NOT_FOUND, HTTP_NOT_FOUND);
}
ids.forEach(id -> this.mockSchemaRegistry.removeStub(WireMock.get(getSchemaPattern(id))));
this.mockSchemaRegistry.removeStub(WireMock.delete(getSubjectPattern(subject)));
this.mockSchemaRegistry.removeStub(WireMock.get(getSubjectVersionsPattern(subject)));
this.mockSchemaRegistry.removeStub(WireMock.get(getSubjectVersionPattern(subject)));
this.schemaRegistryClient.deleteSubject(subject);
return ids;
} catch (final IOException | RestClientException e) {
} catch (final IOException e) {
throw new IllegalStateException("Internal error in mock schema registry client", e);
}
}

private List<Integer> listVersions(final String subject) {
private List<Integer> listVersions(String subject) throws RestClientException {
log.debug("Listing all versions for subject {}", subject);
try {
return this.schemaRegistryClient.getAllVersions(subject);
} catch (final IOException | RestClientException e) {
final List<Integer> subjectVersions = this.schemaRegistryClient.getAllVersions(subject);
if (subjectVersions.isEmpty()) {
throw new RestClientException("No versions for subject " + subject, HTTP_NOT_FOUND, HTTP_NOT_FOUND);
}
return subjectVersions;
} catch (IOException e) {
throw new IllegalStateException("Internal error in mock schema registry client", e);
}
}

private SchemaMetadata getSubjectVersion(final String subject, final Object version) {
private SchemaMetadata getSubjectVersion(String subject, Object version) throws RestClientException {
log.debug("Requesting version {} for subject {}", version, subject);
try {
if (version instanceof String && version.equals("latest")) {
Expand All @@ -221,7 +248,9 @@ private SchemaMetadata getSubjectVersion(final String subject, final Object vers
} else {
throw new IllegalArgumentException("Only 'latest' or integer versions are allowed");
}
} catch (final IOException | RestClientException e) {
} catch (IOException e) {
throw new RestClientException(e.getMessage(), HTTP_NOT_FOUND, HTTP_NOT_FOUND);
} catch (RestClientException e) {
throw new IllegalStateException("Internal error in mock schema registry client", e);
}
}
Expand All @@ -234,30 +263,28 @@ private Collection<String> listAllSubjects() {
}
}

private abstract static class SubjectsHandler extends ResponseDefinitionTransformer {
// Expected url pattern /subjects(/.*-value/versions)
private abstract static class SubjectsVersionHandler extends ResponseDefinitionTransformer {
// Expected url pattern /subjects/.*-value/versions
protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings();

protected String getSubject(@NonNull Request request) {
return Iterables.get(this.urlSplitter.split(request.getUrl()), 1);
}

@Override
public boolean applyGlobally() {
return false;
}

protected String getSubject(final Request request) {
return Iterables.get(this.urlSplitter.split(request.getUrl()), 1);
}
}

private class AutoRegistrationHandler extends SubjectsHandler {

private class AutoRegistrationHandler extends SubjectsVersionHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final FileSource files, final Parameters parameters) {
final String subject = Iterables.get(this.urlSplitter.split(request.getUrl()), 1);
try {
final int id = SchemaRegistryMock.this.register(subject,
new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()));
final int id = SchemaRegistryMock.this.register(subject, new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()));
final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
registerSchemaResponse.setId(id);
return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse);
Expand All @@ -272,13 +299,20 @@ public String getName() {
}
}

private class ListVersionsHandler extends SubjectsHandler {

private class ListVersionsHandler extends SubjectsVersionHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final List<Integer> versions = SchemaRegistryMock.this.listVersions(this.getSubject(request));
log.debug("Got versions {}", versions);
final FileSource files, final Parameters parameters) {
final List<Integer> versions;
try {
versions = SchemaRegistryMock.this.listVersions(this.getSubject(request));
log.debug("Got versions {}", versions);
} catch (RestClientException e) {
return new ResponseDefinitionBuilder()
.withStatus(e.getStatus())
.withBody(e.getMessage())
.build();
}
return ResponseDefinitionBuilder.jsonResponse(versions);
}

Expand All @@ -288,18 +322,28 @@ public String getName() {
}
}

private class GetVersionHandler extends SubjectsHandler {

private class GetVersionHandler extends SubjectsVersionHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final FileSource files, final Parameters parameters) {
final String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
final SchemaMetadata metadata;
if (versionStr.equals("latest")) {
metadata = SchemaRegistryMock.this.getSubjectVersion(this.getSubject(request), versionStr);
} else {
final int version = Integer.parseInt(versionStr);
metadata = SchemaRegistryMock.this.getSubjectVersion(this.getSubject(request), version);
try {
final String subject = this.getSubject(request);
if (versionStr.equals("latest")) {
metadata = SchemaRegistryMock.this.getSubjectVersion(subject, versionStr);
} else {
final int version = Integer.parseInt(versionStr);
if (version <= 0) {
throw new RestClientException("Version " + version + " not in valid range for schema version (1..2^31)", HTTP_NOT_FOUND, HTTP_NOT_FOUND);
}
metadata = SchemaRegistryMock.this.getSubjectVersion(subject, version);
}
} catch (RestClientException e) {
return new ResponseDefinitionBuilder()
.withStatus(e.getStatus())
.withBody(e.getMessage())
.build();
}
return ResponseDefinitionBuilder.jsonResponse(metadata);
}
Expand All @@ -310,11 +354,19 @@ public String getName() {
}
}

private class DeleteSubjectHandler extends SubjectsHandler {
private class DeleteSubjectHandler extends SubjectsVersionHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final List<Integer> ids = SchemaRegistryMock.this.delete(this.getSubject(request));
final FileSource files, final Parameters parameters) {
final List<Integer> ids;
try {
ids = SchemaRegistryMock.this.delete(this.getSubject(request));
} catch (RestClientException e) {
return new ResponseDefinitionBuilder()
.withStatus(e.getStatus())
.withBody(e.getMessage())
.build();
}
return ResponseDefinitionBuilder.jsonResponse(ids);
}

Expand All @@ -324,10 +376,10 @@ public String getName() {
}
}

private class AllSubjectsHandler extends SubjectsHandler {
private class AllSubjectsHandler extends SubjectsVersionHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
final FileSource files, final Parameters parameters) {
final Collection<String> body = SchemaRegistryMock.this.listAllSubjects();
return ResponseDefinitionBuilder.jsonResponse(body);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -169,7 +172,6 @@ void shouldDeleteKeySchema() throws IOException, RestClientException {
assertThat(subjectsAfterDeletion).isEmpty();
}


@Test
void shouldDeleteValueSchema() throws IOException, RestClientException {
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
Expand All @@ -192,7 +194,6 @@ void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException {
assertThat(subjectsAfterDeletion).isEmpty();
}


@Test
void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException {
final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient();
Expand All @@ -211,7 +212,6 @@ void shouldNotDeleteUnknownSubject() {
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}


@Test
void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
Expand Down