Skip to content

Commit

Permalink
1. Remove unused fakeMessage helper methods (leave and deprecate the …
Browse files Browse the repository at this point in the history
…popular zillow one)

1. Remove unused benchmark
1. Remove zgkafka specific headers (rely on composition root for enterprise specific headers)
  • Loading branch information
stewartboyd119 committed Jul 24, 2024
1 parent 7935b74 commit 3440ad8
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 213 deletions.
66 changes: 0 additions & 66 deletions example/worker/bench/main.go

This file was deleted.

115 changes: 42 additions & 73 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package zkafka
import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"github.com/zillow/zfmt"
)

Expand All @@ -32,15 +29,49 @@ type Message struct {
doneOnce sync.Once
}

// A set of observability headers for ZG Kafka
const (
obsKeyMessageID = "GUID"
obsKeyEventTime = "eventTime"
obsKeyOriginService = "originService"
obsKeyOriginHost = "originHost"
)
// DoneWithContext is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) DoneWithContext(ctx context.Context) {
m.doneOnce.Do(func() {
m.doneFunc(ctx)
})
}

// Done is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) Done() {
if m == nil {
return
}
m.doneOnce.Do(func() {
m.doneFunc(context.Background())
})
}

// Decode reads message data and stores it in the value pointed to by v.
func (m *Message) Decode(v any) error {
if m.value == nil {
return errors.New("message is empty")
}
if m.fmt == nil {
// is error is most likely due to user calling KReader/KWriter
// with custom Formatter which can sometimes be nil
return errors.New("formatter is not set")
}
return m.fmt.Unmarshal(m.value, v)
}

// Value returns a copy of the current value byte array. Useful for debugging
func (m *Message) Value() []byte {
if m == nil || m.value == nil {
return nil
}
out := make([]byte, len(m.value))
copy(out, m.value)
return out
}

func makeProducerMessageRaw(_ context.Context, serviceName, topic string, key *string, value []byte) kafka.Message {
func makeProducerMessageRaw(_ context.Context, topic string, key *string, value []byte) kafka.Message {
kafkaMessage := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Expand All @@ -52,26 +83,6 @@ func makeProducerMessageRaw(_ context.Context, serviceName, topic string, key *s
if key != nil {
kafkaMessage.Key = []byte(*key)
}
// Observability
kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{
Key: obsKeyMessageID,
Value: []byte(uuid.New().String()),
})
kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{
Key: obsKeyEventTime,
Value: []byte(fmt.Sprintf("%d", time.Now().Unix())),
})
kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{
Key: obsKeyOriginService,
Value: []byte(serviceName),
})
//nolint:errcheck // Its not particularly noteworthy if if host isn't propagated forward. We'll suppress the error
hostname, _ := os.Hostname()
// hn is empty string if there's an error
kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{
Key: obsKeyOriginHost,
Value: []byte(hostname),
})
return kafkaMessage
}

Expand Down Expand Up @@ -103,48 +114,6 @@ func headers(msg kafka.Message) map[string][]byte {
return res
}

// DoneWithContext is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) DoneWithContext(ctx context.Context) {
m.doneOnce.Do(func() {
m.doneFunc(ctx)
})
}

// Done is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) Done() {
if m == nil {
return
}
m.doneOnce.Do(func() {
m.doneFunc(context.Background())
})
}

// Decode reads message data and stores it in the value pointed to by v.
func (m *Message) Decode(v any) error {
if m.value == nil {
return errors.New("message is empty")
}
if m.fmt == nil {
// is error is most likely due to user calling KReader/KWriter
// with custom Formatter which can sometimes be nil
return errors.New("formatter is not set")
}
return m.fmt.Unmarshal(m.value, v)
}

// Value returns a copy of the current value byte array. Useful for debugging
func (m *Message) Value() []byte {
if m == nil || m.value == nil {
return nil
}
out := make([]byte, len(m.value))
copy(out, m.value)
return out
}

// Response is a kafka response with the Partition where message was sent to along with its assigned Offset
type Response struct {
Partition int32
Expand Down
4 changes: 2 additions & 2 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func Test_makeProducerMessageRaw(t *testing.T) {
Opaque: nil,
Headers: nil,
},
hasHeaders: true,
hasHeaders: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer recoverThenFail(t)
got := makeProducerMessageRaw(tt.args.ctx, tt.args.serviceName, tt.args.topic, tt.args.key, tt.args.value)
got := makeProducerMessageRaw(tt.args.ctx, tt.args.topic, tt.args.key, tt.args.value)
require.Equal(t, tt.want.TopicPartition, got.TopicPartition)
require.Equal(t, tt.want.Key, got.Key)
require.Equal(t, tt.want.Key, got.Key)
Expand Down
Loading

0 comments on commit 3440ad8

Please sign in to comment.