Skip to content

Commit

Permalink
Cleanup documentation (#7)
Browse files Browse the repository at this point in the history
1. Remove reference to zlog (zillow internal logging implementation)
2. Added `worker-deadletter` and `worker-delay`
3. Add maked targets for new examples
4. Updated readme.md
5. Updated DeadLetterTopicConfig to honor parent consumer config's
clientid
  • Loading branch information
stewartboyd119 authored Jul 24, 2024
1 parent a16d04c commit 7935b74
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 229 deletions.
9 changes: 0 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ linters-settings:
- default
gosimple:
go: '1.17'
govet:
check-shadowing: true
settings:
printf:
funcs:
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Debug
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Info
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Warn
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Error
depguard:
rules:
Main:
Expand Down
29 changes: 20 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,35 @@
MODULE_DIRS = .


.PHONY: setup-test
setup-test:
docker compose -p $$RANDOM -f ./example/compose.yaml up -d

.PHONY: test-local
test-local: setup-test cover
# Sets up kafka broker using docker compose
.PHONY: setup
setup:
docker compose -f ./example/compose.yaml up -d

# Assumes setup has been executed. Runs go test with coverage
.PHONY: cover
cover:
export GO_TAGS=--tags=integration; ./coverage.sh --tags=integration
export GO_TAGS=--tags=integration; ./coverage.sh

# Runs setup and executes tests with coverage.
.PHONY: test-local
test-local: setup cover

.PHONY: example-producer
example-producer:
go run example/producer/producer.go
go run example/producer/main.go

.PHONY: example-worker
example-worker:
go run example/worker/worker.go
go run example/worker/main.go

.PHONY: example-deadletter-worker
example-deadletter-worker:
go run example/worker-deadletter/main.go

.PHONY: example-delay-worker
example-delay-worker:
go run example/worker-delay/main.go

.PHONY: lint
lint: golangci-lint
Expand Down
383 changes: 304 additions & 79 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions coverage.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env bash
set -x

# allows for GO test args to be passed in (Specifically added to control whether or not to pass in `--tags=integration`).
go_tags=$GO_TAGS
go_tags="${go_tags:---tags=unit}"

Expand Down
75 changes: 0 additions & 75 deletions example/consumer/consumer.go

This file was deleted.

8 changes: 4 additions & 4 deletions example/producer/producer.go → example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@ import (
"math/rand"
"time"

"github.com/google/uuid"
"github.com/zillow/zfmt"
"github.com/zillow/zkafka"
)

func main() {
ctx := context.Background()
writer, err := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:9092"},
BootstrapServers: []string{"localhost:29092"},
}).Writer(ctx, zkafka.ProducerTopicConfig{
ClientID: "example",
Topic: "two-multi-partition",
Topic: "zkafka-example-topic",
Formatter: zfmt.JSONFmt,
})
randomNames := []string{"stewy", "lydia", "asif", "mike", "justin"}
if err != nil {
log.Panic(err)
}
for {
event := DummyEvent{
Name: uuid.NewString(),
Name: randomNames[rand.Intn(len(randomNames))],
Age: rand.Intn(100),
}

Expand Down
106 changes: 106 additions & 0 deletions example/worker-deadletter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/zillow/zkafka"
)

// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly
func main() {
ctx := context.Background()
client := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:29092"},
},
zkafka.LoggerOption(stdLogger{}),
)
// It's important to close the client after consumption to gracefully leave the consumer group
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)
defer client.Close()

readTimeoutMillis := 10000
topicConfig := zkafka.ConsumerTopicConfig{
// ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important
// part of consumer group semantics. A typical convention is to use the service name executing the kafka worker
ClientID: "service-name",
// GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same
// topic the topic's partitions will be split between the collection. The broker remembers
// what offset has been committed for a consumer group, and therefore work can be picked up where it was left off
// across releases
GroupID: "zkafka/example/example-consumer",
Topic: "zkafka-example-topic",
// Controls how long ReadMessage() wait in work before returning a nil message. The default is 1s, but is increased in this example
// to reduce the number of debug logs which come when a nil message is returned
ReadTimeoutMillis: &readTimeoutMillis,
// When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
// when a processing error occurs.
DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
Topic: "zkafka-example-deadletter-topic",
},
AdditionalProps: map[string]any{
// only important the first time a consumer group connects.
"auto.offset.reset": "earliest",
},
}
// optionally set up a channel to signal when worker shutdown should occur.
// A nil channel is also acceptable, but this example demonstrates how to make utility of the signal.
// The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads.
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM)
shutdown := make(chan struct{})

go func() {
<-stopCh
close(shutdown)
}()

wf := zkafka.NewWorkFactory(client)
// Register a processor which is executed per message.
work := wf.Create(topicConfig, &Processor{})
if err := work.Run(ctx, shutdown); err != nil {
log.Panic(err)
}
}

type Processor struct{}

func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
// Processing errors result in the message being written to the configured dead letter topic (DLT).
// Any error object works, but finer grained controlled cn be accomplished by returning a
// `zkafka.ProcessError`. In this example, we control the behavior of the circuit breaker and can optionally
// skip writing the DLT (this example doesn't opt to do that)
//
// Because debug logging is on, the producer log (for when a message is written to the DLT) will show in std out
return zkafka.ProcessError{
Err: errors.New("processing failed"),
DisableCircuitBreak: true,
DisableDLTWrite: false,
}
}

type stdLogger struct {
}

func (l stdLogger) Debugw(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Debugw-"+msg, keysAndValues...)
}

func (l stdLogger) Infow(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Infow-"+msg, keysAndValues...)
}

func (l stdLogger) Errorw(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Errorw-"+msg, keysAndValues...)
}

func (l stdLogger) Warnw(_ context.Context, msg string, keysAndValues ...interface{}) {
prefix := fmt.Sprintf("Warnw-%s-"+msg, time.Now().Format(time.RFC3339Nano))
log.Printf(prefix, keysAndValues...)
}
68 changes: 68 additions & 0 deletions example/worker-delay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/zillow/zkafka"
)

// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly
func main() {
ctx := context.Background()
client := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:29092"},
},
// optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing
//zkafka.LoggerOption(),
)
// It's important to close the client after consumption to gracefully leave the consumer group
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)
defer client.Close()

processDelayMillis := 10 * 1000
topicConfig := zkafka.ConsumerTopicConfig{
// ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important
// part of consumer group semantics. A typical convention is to use the service name executing the kafka worker
ClientID: "service-name",
// GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same
// topic the topic's partitions will be split between the collection. The broker remembers
// what offset has been committed for a consumer group, and therefore work can be picked up where it was left off
// across releases
GroupID: "zkafka/example/example-consumer",
Topic: "zkafka-example-topic",
// This value instructs the kafka worker to inspect the message timestamp, and not call the processor call back until
// at least the process delay duration has passed
ProcessDelayMillis: &processDelayMillis,
}
// optionally set up a channel to signal when worker shutdown should occur.
// A nil channel is also acceptable, but this example demonstrates how to make utility of the signal.
// The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads.
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM)
shutdown := make(chan struct{})

go func() {
<-stopCh
close(shutdown)
}()

wf := zkafka.NewWorkFactory(client)
// Register a processor which is executed per message.
// Speedup is used to create multiple processor goroutines. Order is still maintained with this setup by way of `virtual partitions`
work := wf.Create(topicConfig, &Processor{}, zkafka.Speedup(5))
if err := work.Run(ctx, shutdown); err != nil {
log.Panic(err)
}
}

type Processor struct{}

func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
log.Printf(" offset: %d, partition: %d. Time since msg.Timestamp %s", msg.Offset, msg.Partition, time.Since(msg.TimeStamp))
return nil
}
Loading

0 comments on commit 7935b74

Please sign in to comment.