From 4317d34b16f882d39cebba3761dfb217c0ce4482 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 3 May 2024 15:58:42 +0200 Subject: [PATCH] schema operations, schema storage, toc --- .../20240430-schema-support.md | 131 +++++++++++++++++- docs/design-documents/schema.proto | 53 ------- 2 files changed, 124 insertions(+), 60 deletions(-) delete mode 100644 docs/design-documents/schema.proto diff --git a/docs/design-documents/20240430-schema-support.md b/docs/design-documents/20240430-schema-support.md index 825d6be5f..483ae0483 100644 --- a/docs/design-documents/20240430-schema-support.md +++ b/docs/design-documents/20240430-schema-support.md @@ -1,4 +1,19 @@ -# Schema support +

Schema support

+ + + * [The problem](#the-problem) + * [Requirements](#requirements) + * [Schema format](#schema-format) + * [Support for schemas originating from other streaming tools](#support-for-schemas-originating-from-other-streaming-tools) + * [Schema evolution](#schema-evolution) + * [Implementation](#implementation) + * [Schema operations](#schema-operations) + * [Option 1: Stream of commands and responses](#option-1-stream-of-commands-and-responses) + * [Option 2: Exposing Conduit as a service](#option-2-exposing-conduit-as-a-service) + * [Chosen option](#chosen-option) + * [Questions](#questions) + * [Other considerations](#other-considerations) + ## The problem @@ -19,8 +34,8 @@ the above. 1. Records **should not** carry the full schema. - Reason: If a record would carry the whole schema, that would increase the - record size a lot. + Reason: If a record would carry the whole schema, that might increase a + record's size significantly. 2. Sources and destinations need to be able to work with multiple schemas. Reason: Multiple collections support. @@ -40,6 +55,8 @@ the above. cost of repeatedly fetching the same schema many times (especially over gRPC), schemas should be cached by the SDK. +## Non-goals + ## Schema format A destination connector should work with one schema format only, regardless of @@ -56,13 +73,14 @@ A schema consists of following: * default value The following types are supported: -* Primitive: +* basic: * boolean * integers: 8, 16, 32, 64-bit * float: single precision (32-bit) and double precision (64-bit) IEEE 754 floating-point number * bytes * string -* Complex: + * timestamp +* complex: * array * map * struct @@ -70,6 +88,22 @@ The following types are supported: Every field in a schema can be marked as optional (nullable). +## Support for schemas originating from other streaming tools + +Conduit is sometimes used alongside other streaming tools. For example, Kafka +Connect may be used to read data from a source and write it into a topic. +Conduit then reads messages from that topic and writes it into a destination. We +also have the Kafka Connect wrapper which makes it possible to use Kafka Connect +connectors with Conduit. Here we have two possibilities: + +1. The schema is part of the record (e.g. Debezium records) +2. The schema can be found in a schema registry (e.g. an Avro schema registry) + +## Schema evolution + +TBD (this section is about checking if a new version of a schema is compatible +with the previous one) + ## Implementation Schema support is part of the OpenCDC standard. A schema is represented by the @@ -131,9 +165,92 @@ message Field { } ``` -A reference to a schema is saved in a new metadata field, `opencdc.schemaID`. +### Schema storage + +The schemas are stored in Conduit's database. Currently, there's no need to use +an external service. + +### Schema operations + +Source connectors need a way to register a schema, and destination connectors +need a way to fetch a schema. Regardless of which schema registry is used (an +internal one in Conduit or an external service), the access should be abstracted +away and should go through Conduit. With that, we have the following +implementation options: + +#### Option 1: Stream of commands and responses + +This pattern is used in WASM processors. A server (in this case: Conduit) +listens to commands (in this case: via a bidirectional gRPC stream). A client ( +in this case: a connector) sends a command to either register a schema or fetch +a schema. Conduit receives the command and replies. An example can be seen below: + +```protobuf +rpc CommandStream(stream Command) returns (stream Response); +``` + +For different types of commands and response to be supported, `Command` +and `Response` need to have a `oneof` field where all the possible commands +and respective responses are listed: + +```protobuf +message Command { + oneof cmd { + SaveSchemaCommand saveSchemaCmd = 1; + // etc. + } +} + +message Response { + oneof resp { + SaveSchemaResponse saveSchemaResp = 1; + // etc. + } +} +``` +**Advantages**: + +1. No additional connection setup. When Conduit starts a connector process, it + establishes a connection. The same connection is used for all communication ( + e.g. configuring a connector, opening, reading/writing records, etc.) +2. Connector actions (which are planned for a future milestone) might use the + same command-and-reply stream. + +**Disadvantages**: + +1. Separate flow needed to establish a connector to a remote Conduit instance ( + see [requirement](#requirements) #3). +2. A single method for all the operations makes both, the server and client + implementation, more complex. In Conduit, a single gRPC method needs to check + for the command type and then reply with a response. Then the client (i.e. + the connector) needs to check the response type. In case multiple commands + are sent, we need ordering guarantees. + +#### Option 2: Exposing Conduit as a service + +Conduit exposes a service to work with schemas. Connectors access the service +and call methods on the service. + +For this work, a connector (i.e. clients of the schema service) needs Conduit's +IP address and the gRPC port. The IP address can be fetched +using [peer](https://pkg.go.dev/google.golang.org/grpc/peer#Peer). Conduit can +send its gRPC port to the connector via the `Configure` method. + +**Advantages**: + +1. Works with a remote Conduit instance (see [requirement](#requirements) #3). +2. Easy to understand: the gRPC methods, together with requests and responses, + can easily be understood from a proto file. +3. An HTTP API for the schema registry can easily be exposed (if needed). + +**Disadvatanges**: + +1. Changes needed to communicate Conduit's gRPC port to the connector. + +#### Chosen option -### Schema-related operations in connectors +**Option 2** is the chosen method since it offers more clarity and the support for +remote Conduit instances. ## Questions diff --git a/docs/design-documents/schema.proto b/docs/design-documents/schema.proto deleted file mode 100644 index 2fe505339..000000000 --- a/docs/design-documents/schema.proto +++ /dev/null @@ -1,53 +0,0 @@ -syntax = "proto3"; - -message Schema { - string id = 1; - repeated Field fields = 2; -} - -message FieldType { - oneof type { - PrimitiveFieldType primitiveType = 1; - ArrayType arrayType = 2; - MapType mapType = 3; - StructType structType = 4; - UnionType unionType = 5; - } -} - -enum PrimitiveFieldType { - BOOLEAN = 0; - INT8 = 1; - // other primitive types -} - -message ArrayType { - FieldType elementType = 1; -} - -message MapType { - FieldType keyType = 1; - FieldType valueType = 2; -} - -message StructType { - repeated Field fields = 1; -} - -message UnionType { - repeated FieldType types = 1; -} - -message Field { - string name = 1; - oneof type { - PrimitiveFieldType primitiveType = 2; - ArrayType arrayType = 3; - MapType mapType = 4; - StructType structType = 5; - UnionType unionType = 6; - } - bool optional = 7; - // todo: find appropriate type - any defaultValue = 8; -}