Skip to content

Commit

Permalink
Updated to allow subject name specification for schema registry (#14)
Browse files Browse the repository at this point in the history
Updated to allow subject name specification for schema registry
  • Loading branch information
stewartboyd119 authored Sep 23, 2024
1 parent 0e0f899 commit 3efce44
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 47 deletions.
34 changes: 0 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,40 +282,6 @@ This can be used in ProducerTopicConfig/ConsumerTopicConfig just like the others
schema registry communication that's required)


#### Producers

##### Raw Handling

For a producer, this would involve using the `kafka.Writer.WriteRaw()` method which takes in a byte array directly.

##### Native Support

Producers will include the schemaID in messages written to kafka (without any further verification).

#### Consumers

##### Raw Handling

For a consumer, this would involve accessing the value byte array through the `zkafka.Message.Value()` method.

```go
type Processor struct{}
func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
e := MyEvent{}
// The Decode method uses the configured formatter and the configured schema registry ID.
//err := msg.Decode(&e)
// For schema registry, however, it might be better to bypass the configurable formatters, and deserialize the data by accessing the byte array directly
// The below function is a hypothetical function that inspects the data in in the kafka message's value and communicates with schema registry for verification
myUnmarshallFunction(msg.Value(), &e)
...
}
```

##### Native Support
Consumers will verify that the message they're consuming has the schemaID specified in the configuration
(if it's specified). Be careful here, as backwards compatible schema evolutions would be treated as an error condition
as the new schemaID wouldn't match what's in the configuration.

### Consumer/Producer Configuration

See for description of configuration options and their defaults:
Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.2.0 (Sep 23, 2024)
1. Update to allow subject name specification (not just TopicNameStrategy, which default )

## 1.1.0 (Sep 23, 2024)

1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ type SchemaRegistryConfig struct {
Serialization SerializationConfig
// Deserialization provides additional information used by schema registry formatters during deserialization (data read)
Deserialization DeserializationConfig
// SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy)
SubjectName string
}

type SerializationConfig struct {
Expand Down
12 changes: 6 additions & 6 deletions example/producer_avro/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ go 1.23.1

replace github.com/zillow/zkafka v1.0.0 => ../..

require github.com/zillow/zkafka v1.0.0

require (
github.com/zillow/zkafka v1.0.0
github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/heetch/avro v0.4.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
github.com/zillow/zfmt v1.0.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sync v0.7.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
5 changes: 5 additions & 0 deletions example/producer_avro/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX
github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0=
github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro=
github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes=
Expand Down Expand Up @@ -188,6 +189,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E=
github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig=
github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -399,6 +401,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
Expand All @@ -421,6 +424,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand All @@ -435,6 +439,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
Expand Down
10 changes: 5 additions & 5 deletions example/worker_avro/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/bufbuild/protocompile v0.8.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jhump/protoreflect v1.15.6 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -29,9 +29,9 @@ require (
github.com/sony/gobreaker v1.0.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/zillow/zfmt v1.0.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sync v0.7.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions example/worker_avro/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX
github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0=
github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro=
github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes=
Expand Down Expand Up @@ -195,6 +196,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E=
github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig=
github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -414,6 +416,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
Expand All @@ -436,6 +439,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand All @@ -450,6 +454,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
Expand Down
2 changes: 1 addition & 1 deletion formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) {
}

func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
err := f.afmt.Deserialize(req.topic, req.data, &req.target)
err := f.afmt.Deserialize(req.topic, req.data, req.target)
if err != nil {
return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
}
Expand Down
19 changes: 18 additions & 1 deletion schemareg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF
if err != nil {
return avroFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}
deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := avrov2.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
Expand All @@ -43,6 +44,8 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF
if err != nil {
return avroFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)

return avroFmt{
ser: ser,
deser: deser,
Expand All @@ -61,6 +64,8 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot
return protoFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}

deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := protobuf.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
serConfig.NormalizeSchemas = true
Expand All @@ -69,6 +74,7 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot
if err != nil {
return protoFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)
return protoFmt{
ser: ser,
deser: deser,
Expand All @@ -87,6 +93,7 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF
if err != nil {
return jsonFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}
deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := jsonschema.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
Expand All @@ -96,11 +103,12 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF
if err != nil {
return jsonFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)

return jsonFmt{
ser: ser,
deser: deser,
}, nil

}

func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) {
Expand All @@ -122,6 +130,15 @@ func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (
return client, nil
}

func subjectNameStrategy(cfg SchemaRegistryConfig) serde.SubjectNameStrategyFunc {
return func(topic string, serdeType serde.Type, schema schemaregistry.SchemaInfo) (string, error) {
if cfg.SubjectName != "" {
return cfg.SubjectName, nil
}
return serde.TopicNameStrategy(topic, serdeType, schema)
}
}

type avroFmt struct {
ser *avrov2.Serializer
deser *avrov2.Deserializer
Expand Down
Loading

0 comments on commit 3efce44

Please sign in to comment.