Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup documentation #7

Merged
merged 10 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ linters-settings:
- default
gosimple:
go: '1.17'
govet:
check-shadowing: true
settings:
printf:
funcs:
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Debug
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Info
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Warn
- (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Error
depguard:
rules:
Main:
Expand Down
29 changes: 20 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,35 @@
MODULE_DIRS = .


.PHONY: setup-test
setup-test:
docker compose -p $$RANDOM -f ./example/compose.yaml up -d

.PHONY: test-local
test-local: setup-test cover
# Sets up kafka broker using docker compose
.PHONY: setup
setup:
docker compose -f ./example/compose.yaml up -d

# Assumes setup has been executed. Runs go test with coverage
.PHONY: cover
cover:
export GO_TAGS=--tags=integration; ./coverage.sh --tags=integration
export GO_TAGS=--tags=integration; ./coverage.sh

# Runs setup and executes tests with coverage.
.PHONY: test-local
test-local: setup cover

.PHONY: example-producer
example-producer:
go run example/producer/producer.go
go run example/producer/main.go

.PHONY: example-worker
example-worker:
go run example/worker/worker.go
go run example/worker/main.go

.PHONY: example-deadletter-worker
example-deadletter-worker:
go run example/worker-deadletter/main.go

.PHONY: example-delay-worker
example-delay-worker:
go run example/worker-delay/main.go

.PHONY: lint
lint: golangci-lint
Expand Down
383 changes: 304 additions & 79 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions coverage.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env bash
set -x

# allows for GO test args to be passed in (Specifically added to control whether or not to pass in `--tags=integration`).
go_tags=$GO_TAGS
go_tags="${go_tags:---tags=unit}"

Expand Down
75 changes: 0 additions & 75 deletions example/consumer/consumer.go

This file was deleted.

8 changes: 4 additions & 4 deletions example/producer/producer.go → example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@ import (
"math/rand"
"time"

"github.com/google/uuid"
"github.com/zillow/zfmt"
"github.com/zillow/zkafka"
)

func main() {
ctx := context.Background()
writer, err := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:9092"},
BootstrapServers: []string{"localhost:29092"},
}).Writer(ctx, zkafka.ProducerTopicConfig{
ClientID: "example",
Topic: "two-multi-partition",
Topic: "zkafka-example-topic",
Formatter: zfmt.JSONFmt,
})
randomNames := []string{"stewy", "lydia", "asif", "mike", "justin"}
if err != nil {
log.Panic(err)
}
for {
event := DummyEvent{
Name: uuid.NewString(),
Name: randomNames[rand.Intn(len(randomNames))],
Age: rand.Intn(100),
}

Expand Down
106 changes: 106 additions & 0 deletions example/worker-deadletter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/zillow/zkafka"
)

// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly
func main() {
ctx := context.Background()
client := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:29092"},
},
zkafka.LoggerOption(stdLogger{}),
)
// It's important to close the client after consumption to gracefully leave the consumer group
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)
defer client.Close()

readTimeoutMillis := 10000
topicConfig := zkafka.ConsumerTopicConfig{
// ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important
// part of consumer group semantics. A typical convention is to use the service name executing the kafka worker
ClientID: "service-name",
// GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same
// topic the topic's partitions will be split between the collection. The broker remembers
// what offset has been committed for a consumer group, and therefore work can be picked up where it was left off
// across releases
GroupID: "zkafka/example/example-consumer",
Topic: "zkafka-example-topic",
// Controls how long ReadMessage() wait in work before returning a nil message. The default is 1s, but is increased in this example
// to reduce the number of debug logs which come when a nil message is returned
ReadTimeoutMillis: &readTimeoutMillis,
// When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
// when a processing error occurs.
DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
Topic: "zkafka-example-deadletter-topic",
},
AdditionalProps: map[string]any{
// only important the first time a consumer group connects.
"auto.offset.reset": "earliest",
},
}
// optionally set up a channel to signal when worker shutdown should occur.
// A nil channel is also acceptable, but this example demonstrates how to make utility of the signal.
// The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads.
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM)
shutdown := make(chan struct{})

go func() {
<-stopCh
close(shutdown)
}()

wf := zkafka.NewWorkFactory(client)
// Register a processor which is executed per message.
work := wf.Create(topicConfig, &Processor{})
if err := work.Run(ctx, shutdown); err != nil {
log.Panic(err)
}
}

type Processor struct{}

func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
// Processing errors result in the message being written to the configured dead letter topic (DLT).
// Any error object works, but finer grained controlled cn be accomplished by returning a
// `zkafka.ProcessError`. In this example, we control the behavior of the circuit breaker and can optionally
// skip writing the DLT (this example doesn't opt to do that)
//
// Because debug logging is on, the producer log (for when a message is written to the DLT) will show in std out
return zkafka.ProcessError{
Err: errors.New("processing failed"),
DisableCircuitBreak: true,
DisableDLTWrite: false,
}
}

type stdLogger struct {
}

func (l stdLogger) Debugw(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Debugw-"+msg, keysAndValues...)
}

func (l stdLogger) Infow(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Infow-"+msg, keysAndValues...)
}

func (l stdLogger) Errorw(_ context.Context, msg string, keysAndValues ...interface{}) {
log.Printf("Errorw-"+msg, keysAndValues...)
}

func (l stdLogger) Warnw(_ context.Context, msg string, keysAndValues ...interface{}) {
prefix := fmt.Sprintf("Warnw-%s-"+msg, time.Now().Format(time.RFC3339Nano))
log.Printf(prefix, keysAndValues...)
}
68 changes: 68 additions & 0 deletions example/worker-delay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/zillow/zkafka"
)

// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly
func main() {
ctx := context.Background()
client := zkafka.NewClient(zkafka.Config{
BootstrapServers: []string{"localhost:29092"},
},
// optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing
//zkafka.LoggerOption(),
)
// It's important to close the client after consumption to gracefully leave the consumer group
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)
defer client.Close()

processDelayMillis := 10 * 1000
topicConfig := zkafka.ConsumerTopicConfig{
// ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important
// part of consumer group semantics. A typical convention is to use the service name executing the kafka worker
ClientID: "service-name",
// GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same
// topic the topic's partitions will be split between the collection. The broker remembers
// what offset has been committed for a consumer group, and therefore work can be picked up where it was left off
// across releases
GroupID: "zkafka/example/example-consumer",
Topic: "zkafka-example-topic",
// This value instructs the kafka worker to inspect the message timestamp, and not call the processor call back until
// at least the process delay duration has passed
ProcessDelayMillis: &processDelayMillis,
}
// optionally set up a channel to signal when worker shutdown should occur.
// A nil channel is also acceptable, but this example demonstrates how to make utility of the signal.
// The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads.
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM)
shutdown := make(chan struct{})

go func() {
<-stopCh
close(shutdown)
}()

wf := zkafka.NewWorkFactory(client)
// Register a processor which is executed per message.
// Speedup is used to create multiple processor goroutines. Order is still maintained with this setup by way of `virtual partitions`
work := wf.Create(topicConfig, &Processor{}, zkafka.Speedup(5))
if err := work.Run(ctx, shutdown); err != nil {
log.Panic(err)
}
}

type Processor struct{}

func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
log.Printf(" offset: %d, partition: %d. Time since msg.Timestamp %s", msg.Offset, msg.Partition, time.Since(msg.TimeStamp))
return nil
}
Loading