Skip to content

Commit

Permalink
Improve schema registry support (#12)
Browse files Browse the repository at this point in the history
# Improve schema registry support

1. Added formatters which properly use schema registry formatters (zfmt
attempted previously, but had missed the mark)
1. Updated tests to be controlled with envvar instead of build tags
2. Updated golang-ci to remove deprecated options
3. Updated `make` to add proto generation for schema registry evolution
tests
4. Because protobuf has protections against named type collisions, run
tests with a special envvar to ignore this warning
5. Added `LifecyclePostReadImmediate` for better confirmation of read
errors
6. Updated ReaderOption to not update KWriter directly and instead
update an indirect settings object
7. Updated producer clientId to be `clientid + topic` this helps avoid
accidental collisions seen in production with DLT'd consumer
8. Updated compose.yml used for local testing to standup schema registry
  • Loading branch information
stewartboyd119 authored Sep 23, 2024
1 parent c0b0f93 commit 0020d7f
Show file tree
Hide file tree
Showing 53 changed files with 4,252 additions and 356 deletions.
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
- name: Test
env:
KAFKA_BOOTSTRAP_SERVER: ${{ env.kafka_runner_address }}:9092
ENABLE_KAFKA_BROKER_TESTS: true
run: make cover

- name: Upload coverage reports to Codecov
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go.work.sum
.idea/
.run/
zk-multiple-kafka-multiple/
*.out
*.res
*.lsif
*.prof
Expand Down
14 changes: 6 additions & 8 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
run:
skip-dirs:
- docs
- datadog
- kustomize
skip-files:
- 'wire_gen.go'
tests: false
go: '1.22'
issues:
exclude-files:
- 'wire_gen.go'
exclude-dirs:
- docs
linters-settings:
errcheck:
check-type-assertions: true
Expand All @@ -14,8 +14,6 @@ linters-settings:
sections:
- standard
- default
gosimple:
go: '1.17'
depguard:
rules:
Main:
Expand Down
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ setup:
# Assumes setup has been executed. Runs go test with coverage
.PHONY: cover
cover:
export GO_TAGS=--tags=integration; ./coverage.sh
./coverage.sh

# Runs setup and executes tests with coverage.
.PHONY: test-local
Expand Down Expand Up @@ -41,3 +41,17 @@ golangci-lint:
(cd $(mod) && \
echo "[lint] golangci-lint: $(mod)" && \
golangci-lint run --path-prefix $(mod) ./...) &&) true

.PHONY: gen
gen: protoc-exists
cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_1.proto
cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_2.proto
go run github.com/heetch/avro/cmd/[email protected] -p main -d ./example/producer_avro ./example/producer_avro/dummy_event.avsc
go run github.com/heetch/avro/cmd/[email protected] -p main -d ./example/worker_avro ./example/worker_avro/dummy_event.avsc
go run github.com/heetch/avro/cmd/[email protected] -p avro1 -d ./test/evolution/avro1 ./test/evolution/schema_1.avsc
go run github.com/heetch/avro/cmd/[email protected] -p avro2 -d ./test/evolution/avro2 ./test/evolution/schema_2.avsc

# a forced dependency which fails (and prints) if `avro-tools` isn't installed
.PHONY: protoc-exists
protoc-exists:
@which protoc > /dev/null || (echo "protoc is not installed. Install via `brew install protobuf`"; exit 1)
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,17 @@ special processing of these messages.

### SchemaRegistry Support:

There is limited support for schema registry in zkafka. A schemaID can be hardcoded via configuration. No
communication is done with schema registry, but some primitive checks can be conducted if a schemaID is specified via
configuration.
zkafka supports schema registry. It extends `zfmt` to enable this adding three `zfmt.FormatterType`:
```
AvroSchemaRegistry zfmt.FormatterType = "avro_schema_registry"
ProtoSchemaRegistry zfmt.FormatterType = "proto_schema_registry"
JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry"
```

This can be used in ProducerTopicConfig/ConsumerTopicConfig just like the others. Examples have been added
`example/producer_avro` and `example/worker_avro` which demonstrate the additional configuration (mostly there to enable the
schema registry communication that's required)

Below is a breakdown of schema registry interactions into two subcategories. One is `Raw Handling` where the configurable
foramtter is bypassed entirely in favor of operating with the value byte arrays directly. The other is `Native Support` which
attempts to create confluent compatible serializations, without communicating with schema registry directly.

#### Producers

Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 1.1.0 (Sep 22, 2024)

1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
2. Added lifecycle function `LifecyclePostReadImmediate`
3. Added `workFactory.CreateWithFunc` which is a convenience work factory method for creating work using a callback instead of an interface (can reduce boilerplate) in some scenarios.
4. During the creation of readers/writers an error is now returned if bootstrap servers is empty


## 1.0.2 (Sep 6, 2024)

1. Updated `WithDeadLetterTopic` option to borrow username and password from ConsumerTopicConfig when those issues aren't specified on DeadLetterTopicConfig
Expand Down
126 changes: 92 additions & 34 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Client struct {
tp trace.TracerProvider
p propagation.TextMapPropagator

// confluent dependencies
srf *schemaRegistryFactory

producerProvider confluentProducerProvider
consumerProvider confluentConsumerProvider
}

// NewClient instantiates a kafka client to get readers and writers
func NewClient(conf Config, opts ...Option) *Client {
srf := newSchemaRegistryFactory()
c := &Client{
conf: conf,
readers: make(map[string]*KReader),
Expand All @@ -51,6 +53,7 @@ func NewClient(conf Config, opts ...Option) *Client {

producerProvider: defaultConfluentProducerProvider{}.NewProducer,
consumerProvider: defaultConfluentConsumerProvider{}.NewConsumer,
srf: srf,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -79,16 +82,26 @@ func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts
return r, nil
}

reader, err := newReader(c.conf, topicConfig, c.consumerProvider, c.logger, c.groupPrefix)
formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})
if err != nil {
return nil, err
}
// copy settings from client first
reader.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(reader)
reader, err := newReader(readerArgs{
cfg: c.conf,
cCfg: topicConfig,
consumerProvider: c.consumerProvider,
f: formatter,
l: c.logger,
prefix: c.groupPrefix,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
c.readers[topicConfig.ClientID] = reader
return c.readers[topicConfig.ClientID], nil
Expand All @@ -100,8 +113,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
if err != nil {
return nil, err
}
writerKey := getWriterKey(topicConfig)
c.mu.RLock()
w, exist := c.writers[topicConfig.ClientID]
w, exist := c.writers[writerKey]
if exist && !w.isClosed {
c.mu.RUnlock()
return w, nil
Expand All @@ -110,39 +124,36 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts

c.mu.Lock()
defer c.mu.Unlock()
w, exist = c.writers[topicConfig.ClientID]
w, exist = c.writers[writerKey]
if exist && !w.isClosed {
return w, nil
}
writer, err := newWriter(c.conf, topicConfig, c.producerProvider)
formatter, err := c.getFormatter(formatterArgs{
formatter: topicConfig.Formatter,
schemaID: topicConfig.SchemaID,
srCfg: topicConfig.SchemaRegistry,
})

if err != nil {
return nil, err
}
// copy settings from client first
writer.logger = c.logger
writer.tracer = getTracer(c.tp)
writer.p = c.p
writer.lifecycle = c.lifecycle

// overwrite options if given
for _, opt := range opts {
opt(writer)
writer, err := newWriter(writerArgs{
cfg: c.conf,
pCfg: topicConfig,
producerProvider: c.producerProvider,
f: formatter,
l: c.logger,
t: getTracer(c.tp),
p: c.p,
hooks: c.lifecycle,
opts: opts,
})
if err != nil {
return nil, err
}
c.writers[topicConfig.ClientID] = writer
return c.writers[topicConfig.ClientID], nil
}

func getFormatter(topicConfig TopicConfig) (zfmt.Formatter, error) {
switch topicConfig.GetFormatter() {
case CustomFmt:
return &noopFormatter{}, nil
default:
f, err := zfmt.GetFormatter(topicConfig.GetFormatter(), topicConfig.GetSchemaID())
if err != nil {
return nil, fmt.Errorf("unsupported formatter %s", topicConfig.GetFormatter())
}
return f, nil
}
c.writers[writerKey] = writer
return c.writers[writerKey], nil
}

// Close terminates all cached readers and writers gracefully.
Expand All @@ -165,9 +176,56 @@ func (c *Client) Close() error {
return err
}

func (c *Client) getFormatter(args formatterArgs) (kFormatter, error) {
formatter := args.formatter
schemaID := args.schemaID

switch formatter {
case AvroSchemaRegistry:
scl, err := c.srf.createAvro(args.srCfg)
if err != nil {
return nil, err
}
cf, err := newAvroSchemaRegistryFormatter(scl)
return cf, err
case ProtoSchemaRegistry:
scl, err := c.srf.createProto(args.srCfg)
if err != nil {
return nil, err
}
cf := newProtoSchemaRegistryFormatter(scl)
return cf, nil
case JSONSchemaRegistry:
scl, err := c.srf.createJson(args.srCfg)
if err != nil {
return nil, err
}
cf := newJsonSchemaRegistryFormatter(scl)
return cf, nil
case CustomFmt:
return &errFormatter{}, nil
default:
f, err := zfmt.GetFormatter(formatter, schemaID)
if err != nil {
return nil, fmt.Errorf("unsupported formatter %s", formatter)
}
return zfmtShim{F: f}, nil
}
}

func getTracer(tp trace.TracerProvider) trace.Tracer {
if tp == nil {
return nil
}
return tp.Tracer(instrumentationName, trace.WithInstrumentationVersion("v1.0.0"))
}

func getWriterKey(cfg ProducerTopicConfig) string {
return cfg.ClientID + "-" + cfg.Topic
}

type formatterArgs struct {
formatter zfmt.FormatterType
schemaID int
srCfg SchemaRegistryConfig
}
Loading

0 comments on commit 0020d7f

Please sign in to comment.