From 3efce443f9a149a3bc69d1a0380a2844a93b2b65 Mon Sep 17 00:00:00 2001 From: Stewart Boyd Date: Mon, 23 Sep 2024 14:46:32 -0700 Subject: [PATCH] Updated to allow subject name specification for schema registry (#14) Updated to allow subject name specification for schema registry --- README.md | 34 ------ changelog.md | 3 + config.go | 2 + example/producer_avro/go.mod | 12 +- example/producer_avro/go.sum | 5 + example/worker_avro/go.mod | 10 +- example/worker_avro/go.sum | 5 + formatter.go | 2 +- schemareg.go | 19 ++- test/schema_registry_test.go | 222 +++++++++++++++++++++++++++++++++++ 10 files changed, 267 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 6c9b9f4..b2a2901 100644 --- a/README.md +++ b/README.md @@ -282,40 +282,6 @@ This can be used in ProducerTopicConfig/ConsumerTopicConfig just like the others schema registry communication that's required) -#### Producers - -##### Raw Handling - -For a producer, this would involve using the `kafka.Writer.WriteRaw()` method which takes in a byte array directly. - -##### Native Support - -Producers will include the schemaID in messages written to kafka (without any further verification). - -#### Consumers - -##### Raw Handling - -For a consumer, this would involve accessing the value byte array through the `zkafka.Message.Value()` method. - -```go -type Processor struct{} -func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { - e := MyEvent{} - // The Decode method uses the configured formatter and the configured schema registry ID. - //err := msg.Decode(&e) - // For schema registry, however, it might be better to bypass the configurable formatters, and deserialize the data by accessing the byte array directly - // The below function is a hypothetical function that inspects the data in in the kafka message's value and communicates with schema registry for verification - myUnmarshallFunction(msg.Value(), &e) - ... -} -``` - -##### Native Support -Consumers will verify that the message they're consuming has the schemaID specified in the configuration -(if it's specified). Be careful here, as backwards compatible schema evolutions would be treated as an error condition -as the new schemaID wouldn't match what's in the configuration. - ### Consumer/Producer Configuration See for description of configuration options and their defaults: diff --git a/changelog.md b/changelog.md index 9727a99..2f0f53c 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. +## 1.2.0 (Sep 23, 2024) +1. Update to allow subject name specification (not just TopicNameStrategy, which default ) + ## 1.1.0 (Sep 23, 2024) 1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry` diff --git a/config.go b/config.go index 6d12b5d..9e53bfc 100644 --- a/config.go +++ b/config.go @@ -221,6 +221,8 @@ type SchemaRegistryConfig struct { Serialization SerializationConfig // Deserialization provides additional information used by schema registry formatters during deserialization (data read) Deserialization DeserializationConfig + // SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy) + SubjectName string } type SerializationConfig struct { diff --git a/example/producer_avro/go.mod b/example/producer_avro/go.mod index 818a939..f6b2a24 100644 --- a/example/producer_avro/go.mod +++ b/example/producer_avro/go.mod @@ -4,14 +4,14 @@ go 1.23.1 replace github.com/zillow/zkafka v1.0.0 => ../.. +require github.com/zillow/zkafka v1.0.0 require ( - github.com/zillow/zkafka v1.0.0 github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect - github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect + github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/hamba/avro/v2 v2.20.1 // indirect + github.com/hamba/avro/v2 v2.24.0 // indirect github.com/heetch/avro v0.4.5 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -19,8 +19,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/sony/gobreaker v1.0.0 // indirect github.com/zillow/zfmt v1.0.1 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect - golang.org/x/sync v0.7.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.30.0 // indirect + golang.org/x/sync v0.8.0 // indirect google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/example/producer_avro/go.sum b/example/producer_avro/go.sum index e8d3c72..dafa10c 100644 --- a/example/producer_avro/go.sum +++ b/example/producer_avro/go.sum @@ -76,6 +76,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc= github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo= github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0= github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes= @@ -188,6 +189,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E= github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig= +github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -399,6 +401,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM= @@ -421,6 +424,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6 go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= @@ -435,6 +439,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= diff --git a/example/worker_avro/go.mod b/example/worker_avro/go.mod index a5646ae..605f918 100644 --- a/example/worker_avro/go.mod +++ b/example/worker_avro/go.mod @@ -15,9 +15,9 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/bufbuild/protocompile v0.8.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect - github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect + github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/hamba/avro/v2 v2.20.1 // indirect + github.com/hamba/avro/v2 v2.24.0 // indirect github.com/invopop/jsonschema v0.12.0 // indirect github.com/jhump/protoreflect v1.15.6 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -29,9 +29,9 @@ require ( github.com/sony/gobreaker v1.0.0 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/zillow/zfmt v1.0.1 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect - golang.org/x/sync v0.7.0 // indirect + go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel/trace v1.30.0 // indirect + golang.org/x/sync v0.8.0 // indirect google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/example/worker_avro/go.sum b/example/worker_avro/go.sum index ef03f36..ddd3399 100644 --- a/example/worker_avro/go.sum +++ b/example/worker_avro/go.sum @@ -83,6 +83,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc= github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo= github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0= github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes= @@ -195,6 +196,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E= github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig= +github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -414,6 +416,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM= @@ -436,6 +439,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6 go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= @@ -450,6 +454,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= diff --git a/formatter.go b/formatter.go index a8b55d2..ea2b8fb 100644 --- a/formatter.go +++ b/formatter.go @@ -119,7 +119,7 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { } func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { - err := f.afmt.Deserialize(req.topic, req.data, &req.target) + err := f.afmt.Deserialize(req.topic, req.data, req.target) if err != nil { return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) } diff --git a/schemareg.go b/schemareg.go index dbca923..a537134 100644 --- a/schemareg.go +++ b/schemareg.go @@ -34,6 +34,7 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF if err != nil { return avroFmt{}, fmt.Errorf("failed to create deserializer: %w", err) } + deser.SubjectNameStrategy = subjectNameStrategy(srConfig) serConfig := avrov2.NewSerializerConfig() serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas @@ -43,6 +44,8 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF if err != nil { return avroFmt{}, fmt.Errorf("failed to create serializer: %w", err) } + ser.SubjectNameStrategy = subjectNameStrategy(srConfig) + return avroFmt{ ser: ser, deser: deser, @@ -61,6 +64,8 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot return protoFmt{}, fmt.Errorf("failed to create deserializer: %w", err) } + deser.SubjectNameStrategy = subjectNameStrategy(srConfig) + serConfig := protobuf.NewSerializerConfig() serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas serConfig.NormalizeSchemas = true @@ -69,6 +74,7 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot if err != nil { return protoFmt{}, fmt.Errorf("failed to create serializer: %w", err) } + ser.SubjectNameStrategy = subjectNameStrategy(srConfig) return protoFmt{ ser: ser, deser: deser, @@ -87,6 +93,7 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF if err != nil { return jsonFmt{}, fmt.Errorf("failed to create deserializer: %w", err) } + deser.SubjectNameStrategy = subjectNameStrategy(srConfig) serConfig := jsonschema.NewSerializerConfig() serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas @@ -96,11 +103,12 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF if err != nil { return jsonFmt{}, fmt.Errorf("failed to create serializer: %w", err) } + ser.SubjectNameStrategy = subjectNameStrategy(srConfig) + return jsonFmt{ ser: ser, deser: deser, }, nil - } func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) { @@ -122,6 +130,15 @@ func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) ( return client, nil } +func subjectNameStrategy(cfg SchemaRegistryConfig) serde.SubjectNameStrategyFunc { + return func(topic string, serdeType serde.Type, schema schemaregistry.SchemaInfo) (string, error) { + if cfg.SubjectName != "" { + return cfg.SubjectName, nil + } + return serde.TopicNameStrategy(topic, serdeType, schema) + } +} + type avroFmt struct { ser *avrov2.Serializer deser *avrov2.Deserializer diff --git a/test/schema_registry_test.go b/test/schema_registry_test.go index 3d4e3ca..c032f30 100644 --- a/test/schema_registry_test.go +++ b/test/schema_registry_test.go @@ -8,11 +8,14 @@ import ( "os" "testing" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/zillow/zfmt" "github.com/zillow/zkafka" "github.com/zillow/zkafka/test/evolution/avro1" + "github.com/zillow/zkafka/test/evolution/json1" + "github.com/zillow/zkafka/test/evolution/proto1" ) //go:embed evolution/schema_1.avsc @@ -173,6 +176,225 @@ func Test_SchemaNotRegistered_ResultsInWorkerDecodeError(t *testing.T) { require.ErrorContains(t, gotErr, "Subject Not Found") } +func Test_SchemaRegistry_Avro_SubjectNameSpecification(t *testing.T) { + checkShouldSkipTest(t, enableKafkaBrokerTest) + + ctx := context.Background() + topic := "integration-test-topic-2" + uuid.NewString() + bootstrapServer := getBootstrap() + + createTopic(t, bootstrapServer, topic, 1) + t.Logf("Created topic: %s", topic) + + groupID := uuid.NewString() + + client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + defer func() { require.NoError(t, client.Close()) }() + + subjName := uuid.NewString() + t.Log("Created writer with auto registered schemas") + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + Serialization: zkafka.SerializationConfig{ + AutoRegisterSchemas: true, + Schema: dummyEventSchema1, + }, + }, + }) + require.NoError(t, err) + + evt1 := avro1.DummyEvent{ + IntField: int(rand.Int31()), + DoubleField: rand.Float64(), + StringField: uuid.NewString(), + BoolField: true, + BytesField: []byte(uuid.NewString()), + } + // write msg1, and msg2 + _, err = writer1.Write(ctx, evt1) + require.NoError(t, err) + + consumerTopicConfig := zkafka.ConsumerTopicConfig{ + ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + }, + GroupID: groupID, + AdditionalProps: map[string]any{ + "auto.offset.reset": "earliest", + }, + } + reader, err := client.Reader(ctx, consumerTopicConfig) + require.NoError(t, err) + + t.Log("Begin reading messages") + results, err := readMessages(reader, 1) + require.NoError(t, err) + + msg1 := <-results + t.Log("Close reader") + + require.NoError(t, reader.Close()) + + receivedEvt1 := avro1.DummyEvent{} + require.NoError(t, msg1.Decode(&receivedEvt1)) + assertEqual(t, evt1, receivedEvt1) +} + +func Test_SchemaRegistry_Proto_SubjectNameSpecification(t *testing.T) { + checkShouldSkipTest(t, enableKafkaBrokerTest) + + ctx := context.Background() + topic := "integration-test-topic-2" + uuid.NewString() + bootstrapServer := getBootstrap() + + createTopic(t, bootstrapServer, topic, 1) + t.Logf("Created topic: %s", topic) + + groupID := uuid.NewString() + + client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + defer func() { require.NoError(t, client.Close()) }() + + subjName := uuid.NewString() + t.Log("Created writer with auto registered schemas") + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.ProtoSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + Serialization: zkafka.SerializationConfig{ + AutoRegisterSchemas: true, + Schema: dummyEventSchema1, + }, + }, + }) + require.NoError(t, err) + + evt1 := &proto1.DummyEvent{ + IntField: rand.Int63(), + DoubleField: rand.Float32(), + StringField: uuid.NewString(), + BoolField: true, + BytesField: []byte(uuid.NewString()), + } + + _, err = writer1.Write(ctx, evt1) + require.NoError(t, err) + + consumerTopicConfig := zkafka.ConsumerTopicConfig{ + ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.ProtoSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + }, + GroupID: groupID, + AdditionalProps: map[string]any{ + "auto.offset.reset": "earliest", + }, + } + reader, err := client.Reader(ctx, consumerTopicConfig) + require.NoError(t, err) + + t.Log("Begin reading messages") + results, err := readMessages(reader, 1) + require.NoError(t, err) + + msg1 := <-results + t.Log("Close reader") + + require.NoError(t, reader.Close()) + + receivedEvt1 := &proto1.DummyEvent{} + require.NoError(t, msg1.Decode(receivedEvt1)) + assertEqual(t, evt1, receivedEvt1, cmpopts.IgnoreUnexported(proto1.DummyEvent{})) +} + +func Test_SchemaRegistry_Json_SubjectNameSpecification(t *testing.T) { + checkShouldSkipTest(t, enableKafkaBrokerTest) + + ctx := context.Background() + topic := "integration-test-topic-2" + uuid.NewString() + bootstrapServer := getBootstrap() + + createTopic(t, bootstrapServer, topic, 1) + t.Logf("Created topic: %s", topic) + + groupID := uuid.NewString() + + client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + defer func() { require.NoError(t, client.Close()) }() + + subjName := uuid.NewString() + t.Log("Created writer with auto registered schemas") + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.JSONSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + Serialization: zkafka.SerializationConfig{ + AutoRegisterSchemas: true, + Schema: dummyEventSchema1, + }, + }, + }) + require.NoError(t, err) + + evt1 := &json1.DummyEvent{ + IntField: rand.Int63(), + DoubleField: rand.Float32(), + StringField: uuid.NewString(), + BoolField: true, + BytesField: []byte(uuid.NewString()), + } + + _, err = writer1.Write(ctx, evt1) + require.NoError(t, err) + + consumerTopicConfig := zkafka.ConsumerTopicConfig{ + ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.JSONSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "mock://", + SubjectName: subjName, + }, + GroupID: groupID, + AdditionalProps: map[string]any{ + "auto.offset.reset": "earliest", + }, + } + reader, err := client.Reader(ctx, consumerTopicConfig) + require.NoError(t, err) + + t.Log("Begin reading messages") + results, err := readMessages(reader, 1) + require.NoError(t, err) + + msg1 := <-results + t.Log("Close reader") + + require.NoError(t, reader.Close()) + + receivedEvt1 := &json1.DummyEvent{} + require.NoError(t, msg1.Decode(&receivedEvt1)) + assertEqual(t, evt1, receivedEvt1) +} + func checkShouldSkipTest(t *testing.T, flags ...string) { t.Helper() for _, flag := range flags {