Skip to content

Commit

Permalink
IKC-305 Schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
Piotr Belke authored and Piotr Belke committed Dec 23, 2023
1 parent 6774cd7 commit 390e234
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,14 @@
import static com.consdata.kouncil.config.security.RoleNames.EDITOR_ROLE;
import static com.consdata.kouncil.config.security.RoleNames.VIEWER_ROLE;

import com.consdata.kouncil.config.KouncilConfiguration;
import com.consdata.kouncil.schema.SchemaDTO;
import com.consdata.kouncil.schema.SchemasConfigurationDTO;
import com.consdata.kouncil.schema.SchemasDTO;
import com.consdata.kouncil.schema.clusteraware.SchemaAwareCluster;
import com.consdata.kouncil.schema.clusteraware.SchemaAwareClusterService;
import com.consdata.kouncil.serde.Compatibility;
import com.consdata.kouncil.serde.MessageFormat;
import com.consdata.kouncil.topic.TopicsController;
import com.consdata.kouncil.topic.TopicsDto;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.security.RolesAllowed;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -30,134 +22,49 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RestController
@AllArgsConstructor
public class SchemaRegistryController {

private final SchemaAwareClusterService schemaAwareClusterService;
private final KouncilConfiguration kouncilConfiguration;
private final TopicsController topicsController;

public SchemaRegistryController(SchemaAwareClusterService schemaAwareClusterService,
KouncilConfiguration kouncilConfiguration, TopicsController topicsController) {
this.schemaAwareClusterService = schemaAwareClusterService;
this.kouncilConfiguration = kouncilConfiguration;
this.topicsController = topicsController;
}
private final SchemaRegistryService schemaRegistryService;

@RolesAllowed({ADMIN_ROLE, EDITOR_ROLE, VIEWER_ROLE})
@GetMapping("/api/schemas/configs")
public List<SchemasConfigurationDTO> getSchemasConfiguration() {
return kouncilConfiguration.getClusterConfig()
.entrySet()
.stream()
.map(clusterEntry -> SchemasConfigurationDTO.builder()
.serverId(clusterEntry.getKey())
.hasSchemaRegistry(clusterEntry.getValue().getSchemaRegistry() != null)
.build()
).toList();
return schemaRegistryService.getSchemasConfiguration();
}

@RolesAllowed({ADMIN_ROLE, EDITOR_ROLE, VIEWER_ROLE})
@GetMapping("/api/schemas/latest/{topicName}")
public SchemasDTO getLatestSchemas(@PathVariable String topicName, @RequestParam String serverId) {
if (schemaAwareClusterService.clusterHasSchemaRegistry(serverId)) {
SchemaAwareCluster schemaAwareCluster = schemaAwareClusterService.getClusterSchema(serverId);

var schemaBuilder = SchemasDTO.builder();
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topicName, true)
.ifPresent(schema -> {
schemaBuilder.keyMessageFormat(MessageFormat.valueOf(schema.getSchemaType()));
schemaBuilder.keyPlainTextSchema(schema.getSchema());
});
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topicName, false)
.ifPresent(schema -> {
schemaBuilder.valueMessageFormat(MessageFormat.valueOf(schema.getSchemaType()));
schemaBuilder.valuePlainTextSchema(schema.getSchema());
});
return schemaBuilder.build();
} else {
throw new SchemaRegistryNotConfiguredException(
String.format("Schema registry not configured for specified cluster=[%s]", serverId));
}
return schemaRegistryService.getLatestSchemas(serverId, topicName);
}

@GetMapping("/api/schemas/{serverId}")
public List<SchemaDTO> getAllSchemasForServer(@PathVariable String serverId, @RequestParam("topicNames") List<String> topicNames) {
if (schemaAwareClusterService.clusterHasSchemaRegistry(serverId)) {
List<SchemaDTO> schemas = new ArrayList<>();
TopicsDto topics = topicsController.getTopics(serverId);

topics.getTopics().stream().filter(topic -> topicNames.isEmpty() || topicNames.contains(topic.getName())).forEach(topic -> {
SchemaAwareCluster schemaAwareCluster = schemaAwareClusterService.getClusterSchema(serverId);

schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topic.getName(), true)
.ifPresent(schema -> schemas.add(
SchemaDTO.builder()
.subjectName(topic.getName().concat(TopicUtils.getSubjectSuffix(true)))
.messageFormat(MessageFormat.valueOf(schema.getSchemaType()))
.plainTextSchema(schema.getSchema())
.topicName(topic.getName())
.version(schema.getVersion())
.build()
));
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topic.getName(), false)
.ifPresent(schema -> schemas.add(
SchemaDTO.builder()
.subjectName(topic.getName().concat(TopicUtils.getSubjectSuffix(false)))
.messageFormat(MessageFormat.valueOf(schema.getSchemaType()))
.plainTextSchema(schema.getSchema())
.topicName(topic.getName())
.version(schema.getVersion())
.build()
));

});

return schemas;
} else {
throw new SchemaRegistryNotConfiguredException(
String.format("Schema registry not configured for specified cluster=[%s]", serverId));
}
}

@DeleteMapping("/api/schemas/{serverId}/{subject}/{version}")
public void deleteSchema(@PathVariable String serverId, @PathVariable String subject, @PathVariable String version)
throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().deleteSchema(subject, version);
public List<SchemaDTO> getSchemas(@PathVariable String serverId, @RequestParam("topicNames") List<String> topicNames) {
return schemaRegistryService.getSchemas(serverId, topicNames);
}

@GetMapping("/api/schemas/{serverId}/{subject}/{version}")
public SchemaDTO getSchemaVersion(@PathVariable String serverId, @PathVariable String subject, @PathVariable Integer version)
throws RestClientException, IOException {
SchemaMetadata schema = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getSchemaVersion(subject, version);
List<Integer> allVersions = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getAllVersions(subject);

String compatibility = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getCompatibility(subject);

return SchemaDTO.builder()
.subjectName(subject)
.messageFormat(MessageFormat.valueOf(schema.getSchemaType()))
.plainTextSchema(schema.getSchema())
.version(schema.getVersion())
.topicName(TopicUtils.getTopicName(subject))
.subjectType(TopicUtils.subjectType(subject))
.versionsNo(allVersions)
.compatibility(compatibility != null ? Compatibility.valueOf(compatibility) : null)
.build();
return schemaRegistryService.getSchemaVersion(serverId, subject, version);
}

@PostMapping("/api/schemas/{serverId}")
public void createSchema(@PathVariable String serverId, @RequestBody SchemaDTO schema) throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().createSchema(schema);
schemaRegistryService.createSchema(serverId, schema);
}

@PutMapping("/api/schemas/{serverId}")
public void updateSchema(@PathVariable String serverId, @RequestBody SchemaDTO schema) throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().updateSchema(schema);
schemaRegistryService.updateSchema(serverId, schema);
}

@DeleteMapping("/api/schemas/{serverId}/{subject}/{version}")
public void deleteSchema(@PathVariable String serverId, @PathVariable String subject, @PathVariable String version)
throws RestClientException, IOException {
schemaRegistryService.deleteSchema(serverId, subject, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public ParsedSchema getSchemaByTopicAndId(KouncilSchemaMetadata metadata) {
return schemaRegistryClient.getSchemaBySubjectAndId(subject, metadata.getSchemaId());
}


public void deleteSchema(String subject, String version) throws RestClientException, IOException {
log.info("Delete schema [subject={}, version={}]", subject, version);
schemaRegistryClient.deleteSchemaVersion(subject, version);
schemaRegistryClient.reset();
}

public void createSchema(SchemaDTO schema) throws RestClientException, IOException {
log.info("Creating schema [{}]", schema.toString());
String subject = schema.getTopicName().concat(TopicUtils.getSubjectSuffix(schema.getSubjectType()));
schema.setSubjectName(subject);

Expand All @@ -79,6 +79,7 @@ public void createSchema(SchemaDTO schema) throws RestClientException, IOExcepti
}

public void updateSchema(SchemaDTO schema) throws RestClientException, IOException {
log.info("Updating schema [{}]", schema.toString());
changeSubjectCompatibility(schema);
schemaRegistryClient.register(schema.getSubjectName(), parseSchema(schema.getMessageFormat(), schema.getPlainTextSchema()), true);
}
Expand All @@ -97,16 +98,18 @@ private ParsedSchema parseSchema(MessageFormat messageFormat, String schema) {
case JSON -> parsedSchema = new JsonSchema(schema);
case AVRO -> parsedSchema = new AvroSchema(schema);
case PROTOBUF -> parsedSchema = new ProtobufSchema(schema);
default -> throw new IllegalStateException("Unexpected value: " + messageFormat);
default -> throw new IllegalStateException("Unexpected value: " + messageFormat);
}
return parsedSchema;
}

public List<Integer> getAllVersions(String subject) throws RestClientException, IOException {
log.info("Fetching all schema versions [subject={}]", subject);
return schemaRegistryClient.getAllVersions(subject);
}

public SchemaMetadata getSchemaVersion(String subject, Integer version) throws RestClientException, IOException {
log.info("Fetching schema [subject={}, version={}]", subject, version);
return schemaRegistryClient.getSchemaMetadata(subject, version);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.consdata.kouncil.schema.registry;

import com.consdata.kouncil.config.KouncilConfiguration;
import com.consdata.kouncil.schema.SchemaDTO;
import com.consdata.kouncil.schema.SchemasConfigurationDTO;
import com.consdata.kouncil.schema.SchemasDTO;
import com.consdata.kouncil.schema.clusteraware.SchemaAwareCluster;
import com.consdata.kouncil.schema.clusteraware.SchemaAwareClusterService;
import com.consdata.kouncil.serde.Compatibility;
import com.consdata.kouncil.serde.MessageFormat;
import com.consdata.kouncil.topic.TopicMetadata;
import com.consdata.kouncil.topic.TopicsController;
import com.consdata.kouncil.topic.TopicsDto;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@AllArgsConstructor
public class SchemaRegistryService {

private final TopicsController topicsController;
private final SchemaAwareClusterService schemaAwareClusterService;
private final KouncilConfiguration kouncilConfiguration;

public List<SchemasConfigurationDTO> getSchemasConfiguration() {
return kouncilConfiguration.getClusterConfig()
.entrySet()
.stream()
.map(clusterEntry -> SchemasConfigurationDTO.builder()
.serverId(clusterEntry.getKey())
.hasSchemaRegistry(clusterEntry.getValue().getSchemaRegistry() != null)
.build()
).toList();
}

public SchemasDTO getLatestSchemas(String serverId, String topicName) {
if (schemaAwareClusterService.clusterHasSchemaRegistry(serverId)) {
SchemaAwareCluster schemaAwareCluster = schemaAwareClusterService.getClusterSchema(serverId);

var schemaBuilder = SchemasDTO.builder();
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topicName, true)
.ifPresent(schema -> {
schemaBuilder.keyMessageFormat(MessageFormat.valueOf(schema.getSchemaType()));
schemaBuilder.keyPlainTextSchema(schema.getSchema());
});
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topicName, false)
.ifPresent(schema -> {
schemaBuilder.valueMessageFormat(MessageFormat.valueOf(schema.getSchemaType()));
schemaBuilder.valuePlainTextSchema(schema.getSchema());
});
return schemaBuilder.build();
} else {
throw new SchemaRegistryNotConfiguredException(
String.format("Schema registry not configured for specified cluster=[%s]", serverId));
}
}

public List<SchemaDTO> getSchemas(String serverId, List<String> topicNames) {
if (schemaAwareClusterService.clusterHasSchemaRegistry(serverId)) {
List<SchemaDTO> schemas = new ArrayList<>();
TopicsDto topics = topicsController.getTopics(serverId);

SchemaAwareCluster schemaAwareCluster = schemaAwareClusterService.getClusterSchema(serverId);
topics.getTopics()
.stream()
.filter(topic -> topicNames.isEmpty() || topicNames.contains(topic.getName()))
.forEach(topic -> {
getSchema(true, topic, schemas, schemaAwareCluster);
getSchema(false, topic, schemas, schemaAwareCluster);
});

return schemas;
} else {
throw new SchemaRegistryNotConfiguredException(
String.format("Schema registry not configured for specified cluster=[%s]", serverId));
}
}

private void getSchema(Boolean isKey, TopicMetadata topic, List<SchemaDTO> schemas, SchemaAwareCluster schemaAwareCluster) {
schemaAwareCluster.getSchemaRegistryFacade()
.getLatestSchemaMetadata(topic.getName(), isKey)
.ifPresent(schema -> schemas.add(
SchemaDTO.builder()
.subjectName(topic.getName().concat(TopicUtils.getSubjectSuffix(isKey)))
.messageFormat(MessageFormat.valueOf(schema.getSchemaType()))
.plainTextSchema(schema.getSchema())
.topicName(topic.getName())
.version(schema.getVersion())
.build()
));

}

public SchemaDTO getSchemaVersion(String serverId, String subject, Integer version) throws RestClientException, IOException {
SchemaMetadata schema = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getSchemaVersion(subject, version);
List<Integer> allVersions = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getAllVersions(subject);

String compatibility = schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().getCompatibility(subject);

return SchemaDTO.builder()
.subjectName(subject)
.messageFormat(MessageFormat.valueOf(schema.getSchemaType()))
.plainTextSchema(schema.getSchema())
.version(schema.getVersion())
.topicName(TopicUtils.getTopicName(subject))
.subjectType(TopicUtils.subjectType(subject))
.versionsNo(allVersions)
.compatibility(compatibility != null ? Compatibility.valueOf(compatibility) : null)
.build();
}

public void createSchema(String serverId, SchemaDTO schema) throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().createSchema(schema);
}

public void updateSchema(String serverId, SchemaDTO schema) throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().updateSchema(schema);
}

public void deleteSchema(String serverId, String subject, String version) throws RestClientException, IOException {
schemaAwareClusterService.getClusterSchema(serverId).getSchemaRegistryFacade().deleteSchema(subject, version);
}
}
Loading

0 comments on commit 390e234

Please sign in to comment.