Skip to content

Commit

Permalink
Updated to allow subject name specification for schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 23, 2024
1 parent 0e0f899 commit 50e5db4
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 13 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.2.0 (Sep 23, 2024)
1. Update to allow subject name specification (not just TopicNameStrategy, which default )

## 1.1.0 (Sep 23, 2024)

1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ type SchemaRegistryConfig struct {
Serialization SerializationConfig
// Deserialization provides additional information used by schema registry formatters during deserialization (data read)
Deserialization DeserializationConfig
// SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy)
SubjectName string
}

type SerializationConfig struct {
Expand Down
12 changes: 6 additions & 6 deletions example/producer_avro/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ go 1.23.1

replace github.com/zillow/zkafka v1.0.0 => ../..

require github.com/zillow/zkafka v1.0.0

require (
github.com/zillow/zkafka v1.0.0
github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/heetch/avro v0.4.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
github.com/zillow/zfmt v1.0.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sync v0.7.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
5 changes: 5 additions & 0 deletions example/producer_avro/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX
github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0=
github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro=
github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes=
Expand Down Expand Up @@ -188,6 +189,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E=
github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig=
github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -399,6 +401,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
Expand All @@ -421,6 +424,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand All @@ -435,6 +439,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
Expand Down
10 changes: 5 additions & 5 deletions example/worker_avro/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/bufbuild/protocompile v0.8.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 // indirect
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jhump/protoreflect v1.15.6 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -29,9 +29,9 @@ require (
github.com/sony/gobreaker v1.0.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/zillow/zfmt v1.0.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sync v0.7.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions example/worker_avro/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX
github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0=
github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro=
github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes=
Expand Down Expand Up @@ -195,6 +196,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E=
github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig=
github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -414,6 +416,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
Expand All @@ -436,6 +439,7 @@ go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
Expand All @@ -450,6 +454,7 @@ golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
Expand Down
2 changes: 1 addition & 1 deletion formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) {
}

func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
err := f.afmt.Deserialize(req.topic, req.data, &req.target)
err := f.afmt.Deserialize(req.topic, req.data, req.target)
if err != nil {
return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
}
Expand Down
19 changes: 18 additions & 1 deletion schemareg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF
if err != nil {
return avroFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}
deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := avrov2.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
Expand All @@ -43,6 +44,8 @@ func (c *schemaRegistryFactory) createAvro(srConfig SchemaRegistryConfig) (avroF
if err != nil {
return avroFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)

return avroFmt{
ser: ser,
deser: deser,
Expand All @@ -61,6 +64,8 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot
return protoFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}

deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := protobuf.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
serConfig.NormalizeSchemas = true
Expand All @@ -69,6 +74,7 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot
if err != nil {
return protoFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)
return protoFmt{
ser: ser,
deser: deser,
Expand All @@ -87,6 +93,7 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF
if err != nil {
return jsonFmt{}, fmt.Errorf("failed to create deserializer: %w", err)
}
deser.SubjectNameStrategy = subjectNameStrategy(srConfig)

serConfig := jsonschema.NewSerializerConfig()
serConfig.AutoRegisterSchemas = srConfig.Serialization.AutoRegisterSchemas
Expand All @@ -96,11 +103,12 @@ func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonF
if err != nil {
return jsonFmt{}, fmt.Errorf("failed to create serializer: %w", err)
}
ser.SubjectNameStrategy = subjectNameStrategy(srConfig)

return jsonFmt{
ser: ser,
deser: deser,
}, nil

}

func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (schemaregistry.Client, error) {
Expand All @@ -122,6 +130,15 @@ func (c *schemaRegistryFactory) getSchemaClient(srConfig SchemaRegistryConfig) (
return client, nil
}

func subjectNameStrategy(cfg SchemaRegistryConfig) serde.SubjectNameStrategyFunc {
return func(topic string, serdeType serde.Type, schema schemaregistry.SchemaInfo) (string, error) {
if cfg.SubjectName != "" {
return cfg.SubjectName, nil
}
return serde.TopicNameStrategy(topic, serdeType, schema)
}
}

type avroFmt struct {
ser *avrov2.Serializer
deser *avrov2.Deserializer
Expand Down
Loading

0 comments on commit 50e5db4

Please sign in to comment.