From 80b69cb64949ff6e664bbc6069367d25006fab81 Mon Sep 17 00:00:00 2001 From: Stewart Boyd Date: Wed, 24 Jul 2024 07:32:04 -0700 Subject: [PATCH] Remove unused fakeMessage helper methods (#8) 1. Remove unused fakeMessage helper methods (leave and deprecate the popular zillow one)1 2. Remove unused benchmark 3. Remove zgkafka specific headers (rely on composition root for enterprise specific headers) --- example/worker/bench/main.go | 66 ---------------- message.go | 115 ++++++++++----------------- message_test.go | 4 +- test/worker_test.go | 145 ++++++++++++++++++++++++++++------- testhelper.go | 109 ++++++++++++++++---------- testhelper_test.go | 21 +++-- writer.go | 2 +- 7 files changed, 246 insertions(+), 216 deletions(-) delete mode 100644 example/worker/bench/main.go diff --git a/example/worker/bench/main.go b/example/worker/bench/main.go deleted file mode 100644 index eae21ba..0000000 --- a/example/worker/bench/main.go +++ /dev/null @@ -1,66 +0,0 @@ -package main - -import ( - "context" - "errors" - "fmt" - "log" - "os" - "runtime/pprof" - "time" - - "github.com/golang/mock/gomock" - "github.com/zillow/zfmt" - "github.com/zillow/zkafka" - zkafka_mocks "github.com/zillow/zkafka/mocks" -) - -func main() { - f, err := os.Create("cpu.prof") - if err != nil { - log.Fatal("could not create CPU profile: ", err) - } - defer f.Close() // error handling omitted for example - if err := pprof.StartCPUProfile(f); err != nil { - log.Fatal("could not start CPU profile: ", err) - } - defer pprof.StopCPUProfile() - - ctrl := gomock.NewController(&testReporter{}) - defer ctrl.Finish() - messageDone := func() { - } - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, messageDone) - r := zkafka_mocks.NewMockReader(ctrl) - r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() - - kcp := zkafka_mocks.NewMockClientProvider(ctrl) - kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes() - - kwf := zkafka.NewWorkFactory(kcp) - w := kwf.Create( - zkafka.ConsumerTopicConfig{Topic: "topicName"}, - &kafkaProcessorError{}, - zkafka.Speedup(10), - zkafka.CircuitBreakAfter(100), - zkafka.CircuitBreakFor(30*time.Second), - zkafka.DisableBusyLoopBreaker(), - ) - ctx, c := context.WithTimeout(context.Background(), 2*time.Minute) - defer c() - if err := w.Run(ctx, nil); err != nil { - log.Panic(err) - } -} - -type kafkaProcessorError struct{} - -func (p *kafkaProcessorError) Process(_ context.Context, _ *zkafka.Message) error { - fmt.Print(".") - return errors.New("an error occurred during processing") -} - -type testReporter struct{} - -func (t *testReporter) Errorf(format string, args ...any) {} -func (t *testReporter) Fatalf(format string, args ...any) {} diff --git a/message.go b/message.go index 6604aa0..a78f638 100644 --- a/message.go +++ b/message.go @@ -3,13 +3,10 @@ package zkafka import ( "context" "errors" - "fmt" - "os" "sync" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/google/uuid" "github.com/zillow/zfmt" ) @@ -32,15 +29,49 @@ type Message struct { doneOnce sync.Once } -// A set of observability headers for ZG Kafka -const ( - obsKeyMessageID = "GUID" - obsKeyEventTime = "eventTime" - obsKeyOriginService = "originService" - obsKeyOriginHost = "originHost" -) +// DoneWithContext is used to alert that message processing has completed. +// This marks the message offset to be committed +func (m *Message) DoneWithContext(ctx context.Context) { + m.doneOnce.Do(func() { + m.doneFunc(ctx) + }) +} + +// Done is used to alert that message processing has completed. +// This marks the message offset to be committed +func (m *Message) Done() { + if m == nil { + return + } + m.doneOnce.Do(func() { + m.doneFunc(context.Background()) + }) +} + +// Decode reads message data and stores it in the value pointed to by v. +func (m *Message) Decode(v any) error { + if m.value == nil { + return errors.New("message is empty") + } + if m.fmt == nil { + // is error is most likely due to user calling KReader/KWriter + // with custom Formatter which can sometimes be nil + return errors.New("formatter is not set") + } + return m.fmt.Unmarshal(m.value, v) +} + +// Value returns a copy of the current value byte array. Useful for debugging +func (m *Message) Value() []byte { + if m == nil || m.value == nil { + return nil + } + out := make([]byte, len(m.value)) + copy(out, m.value) + return out +} -func makeProducerMessageRaw(_ context.Context, serviceName, topic string, key *string, value []byte) kafka.Message { +func makeProducerMessageRaw(_ context.Context, topic string, key *string, value []byte) kafka.Message { kafkaMessage := kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, @@ -52,26 +83,6 @@ func makeProducerMessageRaw(_ context.Context, serviceName, topic string, key *s if key != nil { kafkaMessage.Key = []byte(*key) } - // Observability - kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ - Key: obsKeyMessageID, - Value: []byte(uuid.New().String()), - }) - kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ - Key: obsKeyEventTime, - Value: []byte(fmt.Sprintf("%d", time.Now().Unix())), - }) - kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ - Key: obsKeyOriginService, - Value: []byte(serviceName), - }) - //nolint:errcheck // Its not particularly noteworthy if if host isn't propagated forward. We'll suppress the error - hostname, _ := os.Hostname() - // hn is empty string if there's an error - kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ - Key: obsKeyOriginHost, - Value: []byte(hostname), - }) return kafkaMessage } @@ -103,48 +114,6 @@ func headers(msg kafka.Message) map[string][]byte { return res } -// DoneWithContext is used to alert that message processing has completed. -// This marks the message offset to be committed -func (m *Message) DoneWithContext(ctx context.Context) { - m.doneOnce.Do(func() { - m.doneFunc(ctx) - }) -} - -// Done is used to alert that message processing has completed. -// This marks the message offset to be committed -func (m *Message) Done() { - if m == nil { - return - } - m.doneOnce.Do(func() { - m.doneFunc(context.Background()) - }) -} - -// Decode reads message data and stores it in the value pointed to by v. -func (m *Message) Decode(v any) error { - if m.value == nil { - return errors.New("message is empty") - } - if m.fmt == nil { - // is error is most likely due to user calling KReader/KWriter - // with custom Formatter which can sometimes be nil - return errors.New("formatter is not set") - } - return m.fmt.Unmarshal(m.value, v) -} - -// Value returns a copy of the current value byte array. Useful for debugging -func (m *Message) Value() []byte { - if m == nil || m.value == nil { - return nil - } - out := make([]byte, len(m.value)) - copy(out, m.value) - return out -} - // Response is a kafka response with the Partition where message was sent to along with its assigned Offset type Response struct { Partition int32 diff --git a/message_test.go b/message_test.go index d8b4372..dd8d1c9 100644 --- a/message_test.go +++ b/message_test.go @@ -41,13 +41,13 @@ func Test_makeProducerMessageRaw(t *testing.T) { Opaque: nil, Headers: nil, }, - hasHeaders: true, + hasHeaders: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer recoverThenFail(t) - got := makeProducerMessageRaw(tt.args.ctx, tt.args.serviceName, tt.args.topic, tt.args.key, tt.args.value) + got := makeProducerMessageRaw(tt.args.ctx, tt.args.topic, tt.args.key, tt.args.value) require.Equal(t, tt.want.TopicPartition, got.TopicPartition) require.Equal(t, tt.want.Key, got.Key) require.Equal(t, tt.want.Key, got.Key) diff --git a/test/worker_test.go b/test/worker_test.go index 7aeabab..59aa29d 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -23,9 +23,8 @@ import ( "github.com/golang/mock/gomock" ) -var ( - topicName = "orange" - NoopOnDone = func() {} +const ( + topicName = "orange" ) func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { @@ -186,7 +185,10 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) { l := zkafka.NoopLogger{} - msg := zkafka.GetFakeMessage("1", nil, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) @@ -241,7 +243,10 @@ func TestWork_Run_DoNotSkipCircuitBreak(t *testing.T) { l := zkafka.NoopLogger{} - failureMessage := zkafka.GetFakeMessage("1", nil, &zfmt.JSONFormatter{}, NoopOnDone) + failureMessage := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(failureMessage, nil).AnyTimes() @@ -300,7 +305,11 @@ func TestWork_Run_DoSkipCircuitBreak(t *testing.T) { l := zkafka.NoopLogger{} - failureMessage := zkafka.GetFakeMessage("1", nil, &zfmt.JSONFormatter{}, NoopOnDone) + failureMessage := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + Fmt: &zfmt.JSONFormatter{}, + }) + r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(failureMessage, nil).AnyTimes() @@ -360,7 +369,10 @@ func TestWork_Run_CircuitBreaksOnProcessPanicInsideProcessorGoRoutine(t *testing l := zkafka.NoopLogger{} - msg := zkafka.GetFakeMessage("1", nil, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) @@ -476,7 +488,11 @@ func TestWork_Run_SpedUpIsFaster(t *testing.T) { mockReader := zkafka_mocks.NewMockReader(ctrl) mockReader.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) (*zkafka.Message, error) { - return zkafka.GetFakeMessage(uuid.NewString(), nil, &zfmt.JSONFormatter{}, NoopOnDone), nil + return zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr(uuid.NewString()), + Fmt: &zfmt.JSONFormatter{}, + }), nil + }).AnyTimes() mockReader.EXPECT().Close().Return(nil).AnyTimes() @@ -555,7 +571,11 @@ func TestKafkaWork_ProcessorReturnsErrorIsLoggedAsWarning(t *testing.T) { l.EXPECT().Debugw(gomock.Any(), "Kafka topic message received", "offset", gomock.Any(), "partition", gomock.Any(), "topic", gomock.Any(), "groupID", gomock.Any()).AnyTimes() l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() - msg := zkafka.GetFakeMessage("key", "val", &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("key"), + ValueData: "val", + Fmt: &zfmt.JSONFormatter{}, + }) mockReader := zkafka_mocks.NewMockReader(ctrl) mockReader.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) mockClientProvider := zkafka_mocks.NewMockClientProvider(ctrl) @@ -601,7 +621,12 @@ func TestKafkaWork_ProcessorTimeoutCausesContextCancellation(t *testing.T) { l := zkafka.NoopLogger{} - msg := zkafka.GetFakeMessage("key", "val", &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("key"), + ValueData: "val", + Fmt: &zfmt.JSONFormatter{}, + }) + mockReader := zkafka_mocks.NewMockReader(ctrl) mockReader.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) @@ -993,7 +1018,12 @@ func TestWork_Run_OnDoneCallbackCalledOnProcessorError(t *testing.T) { l := zkafka.NoopLogger{} - msg := zkafka.GetFakeMessage("key", "val", &zfmt.StringFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: "val", + Fmt: &zfmt.StringFormatter{}, + }) + r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).AnyTimes().Return(msg, nil) @@ -1049,7 +1079,12 @@ func TestWork_Run_WritesMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("key", "val", &zfmt.StringFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("key"), + ValueData: "val", + Fmt: &zfmt.StringFormatter{}, + }) + msg.Topic = topicName r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).MinTimes(1).Return(msg, nil) @@ -1104,7 +1139,7 @@ func TestWork_LifecycleHooksCalledForEachItem_Reader(t *testing.T) { l := zkafka.NoopLogger{} numMsgs := 5 - msgs := zkafka.GetFakeMessages(topicName, numMsgs, struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msgs := getFakeMessages(topicName, numMsgs, struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}) r := zkafka_mocks.NewMockReader(ctrl) gomock.InOrder( @@ -1171,7 +1206,7 @@ func TestWork_LifecycleHooksPostReadCanUpdateContext(t *testing.T) { l := zkafka.NoopLogger{} numMsgs := 1 - msgs := zkafka.GetFakeMessages(topicName, numMsgs, "lydia", &zfmt.JSONFormatter{}, NoopOnDone) + msgs := getFakeMessages(topicName, numMsgs, "lydia", &zfmt.JSONFormatter{}) r := zkafka_mocks.NewMockReader(ctrl) gomock.InOrder( @@ -1232,7 +1267,7 @@ func TestWork_LifecycleHooksPostReadErrorDoesntHaltProcessing(t *testing.T) { l := zkafka.NoopLogger{} numMsgs := 1 - msgs := zkafka.GetFakeMessages(topicName, numMsgs, "lydia", &zfmt.JSONFormatter{}, NoopOnDone) + msgs := getFakeMessages(topicName, numMsgs, "lydia", &zfmt.JSONFormatter{}) r := zkafka_mocks.NewMockReader(ctrl) gomock.InOrder( @@ -1289,7 +1324,7 @@ func TestWork_LifecycleHooksCalledForEachItem(t *testing.T) { l := zkafka.NoopLogger{} numMsgs := 5 - msgs := zkafka.GetFakeMessages(topicName, numMsgs, struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msgs := getFakeMessages(topicName, numMsgs, struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}) r := zkafka_mocks.NewMockReader(ctrl) gomock.InOrder( @@ -1387,7 +1422,11 @@ func NewFakeLifecycleHooks(mtx *sync.Mutex, state *FakeLifecycleState) zkafka.Li } func getRandomMessage() *zkafka.Message { - return zkafka.GetFakeMessage(fmt.Sprintf("%d", rand.Intn(5)), nil, &zfmt.JSONFormatter{}, NoopOnDone) + return zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr(fmt.Sprintf("%d", rand.Intn(5))), + DoneFunc: func(ctx context.Context) {}, + Fmt: &zfmt.JSONFormatter{}, + }) } func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen(t *testing.T) { @@ -1397,7 +1436,11 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen ctrl := gomock.NewController(t) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() @@ -1458,7 +1501,12 @@ func TestWork_CircuitBreaker_WaitsForCircuitToOpen(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) + r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() @@ -1514,7 +1562,11 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { defer ctrl.Finish() qr := zkafka_mocks.NewMockReader(ctrl) - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "stewy"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "stewy"}, + Fmt: &zfmt.JSONFormatter{}, + }) gomock.InOrder( qr.EXPECT().Read(gomock.Any()).Times(1).Return(msg, nil), qr.EXPECT().Read(gomock.Any()).AnyTimes().Return(nil, nil), @@ -1581,7 +1633,11 @@ func Test_Bugfix_WorkPoolCanBeRestartedAfterShutdown(t *testing.T) { l := zkafka.NoopLogger{} mockReader := zkafka_mocks.NewMockReader(ctrl) - msg1 := zkafka.GetFakeMessage("abc", "def", &zfmt.StringFormatter{}, NoopOnDone) + msg1 := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("abc"), + ValueData: "def", + Fmt: &zfmt.StringFormatter{}, + }) mockReader.EXPECT().Read(gomock.Any()).Return(msg1, nil).AnyTimes() mockReader.EXPECT().Close().Return(nil).AnyTimes() @@ -1670,7 +1726,11 @@ func Test_MsgOrderingIsMaintainedPerKeyWithAnyNumberOfVirtualPartitions(t *testi keyCount := 3 msgCount := 200 for i := 0; i < msgCount; i++ { - msg1 := zkafka.GetFakeMessage(strconv.Itoa(i%keyCount), strconv.Itoa(i), &zfmt.StringFormatter{}, NoopOnDone) + msg1 := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr(strconv.Itoa(i % keyCount)), + ValueData: strconv.Itoa(i), + Fmt: &zfmt.StringFormatter{}, + }) readerCalls = append(readerCalls, mockReader.EXPECT().Read(gomock.Any()).Return(msg1, nil)) } readerCalls = append(readerCalls, mockReader.EXPECT().Read(gomock.Any()).Return(nil, nil).AnyTimes()) @@ -1758,7 +1818,11 @@ func TestWork_LifecycleHookReaderPanicIsHandledAndMessagingProceeds(t *testing.T qr := zkafka_mocks.NewMockReader(ctrl) numMsgs := 1 sentMsg := false - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) qr.EXPECT().Read(gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context) (*zkafka.Message, error) { if !sentMsg { @@ -1834,7 +1898,12 @@ func TestWork_ShutdownCausesRunExit(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) + r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() @@ -1879,7 +1948,11 @@ func BenchmarkWork_Run_CircuitBreaker_BusyLoopBreaker(b *testing.B) { ctrl := gomock.NewController(b) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() @@ -1919,7 +1992,11 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) { ctrl := gomock.NewController(b) defer ctrl.Finish() - msg := zkafka.GetFakeMessage("1", struct{ name string }{name: "arish"}, &zfmt.JSONFormatter{}, NoopOnDone) + msg := zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: ptr("1"), + ValueData: struct{ name string }{name: "arish"}, + Fmt: &zfmt.JSONFormatter{}, + }) r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Return(msg, nil).AnyTimes() @@ -2067,3 +2144,19 @@ func pollWait(f func() bool, opts pollOpts) { time.Sleep(pollPause) } } + +func getFakeMessages(topic string, numMsgs int, value any, formatter zfmt.Formatter) []*zkafka.Message { + msgs := make([]*zkafka.Message, numMsgs) + + for i := 0; i < numMsgs; i++ { + key := fmt.Sprint(i) + msgs[i] = zkafka.GetMsgFromFake(&zkafka.FakeMessage{ + Key: &key, + ValueData: value, + Fmt: formatter, + }) + msgs[i].Topic = topic + } + + return msgs +} diff --git a/testhelper.go b/testhelper.go index 7f4e350..59c57eb 100644 --- a/testhelper.go +++ b/testhelper.go @@ -2,63 +2,90 @@ package zkafka import ( "context" - "fmt" "sync" "time" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/zillow/zfmt" ) +// GetFakeMessage is a helper method for creating a *Message instance. +// +// Deprecated: As of v1.0.0, Prefer `GetMsgFromFake()` func GetFakeMessage(key string, value any, fmt zfmt.Formatter, doneFunc func()) *Message { - wrapperFunc := func(c context.Context) { doneFunc() } - return GetFakeMessageWithContext(key, value, fmt, wrapperFunc) + return getFakeMessage(key, value, fmt, doneFunc) } -func GetFakeMessages(topic string, numMsgs int, value any, formatter zfmt.Formatter, doneFunc func()) []*Message { - msgs := make([]*Message, numMsgs) +func getFakeMessage(key string, value any, fmt zfmt.Formatter, doneFunc func()) *Message { wrapperFunc := func(c context.Context) { doneFunc() } + return GetMsgFromFake(&FakeMessage{ + Key: &key, + ValueData: value, + Fmt: fmt, + DoneFunc: wrapperFunc, + }) +} - for i := 0; i < numMsgs; i++ { - key := fmt.Sprint(i) - msgs[i] = GetFakeMessageWithContext(key, value, formatter, wrapperFunc) - msgs[i].Topic = topic - } - - return msgs +// FakeMessage can be used during testing to construct Message objects. +// The Message object has private fields which might need to be tested +type FakeMessage struct { + Key *string + Value []byte + // ValueData allows the specification of serializable instance and uses the provided formatter + // to create ValueData. Any error during serialization is ignored. + ValueData any + DoneFunc func(ctx context.Context) + Headers map[string][]byte + Offset int64 + Partition int32 + Topic string + GroupID string + TimeStamp time.Time + Fmt zfmt.Formatter } -func GetFakeMessageWithContext(key string, value any, fmt zfmt.Formatter, doneFunc func(context.Context)) *Message { - if b, err := fmt.Marshall(value); err == nil { - return &Message{ - Key: key, - Headers: nil, - value: b, - fmt: fmt, - doneFunc: doneFunc, - doneOnce: sync.Once{}, - TimeStamp: time.Now(), - } +// GetMsgFromFake allows the construction of a Message object (allowing the specification of some private fields). +func GetMsgFromFake(msg *FakeMessage) *Message { + if msg == nil { + return nil } - return &Message{ - Key: key, - doneFunc: doneFunc, - doneOnce: sync.Once{}, - TimeStamp: time.Now(), + key := "" + if msg.Key != nil { + key = *msg.Key } -} - -func GetFakeMsgFromRaw(key *string, value []byte, fmt zfmt.Formatter, doneFunc func(context.Context)) *Message { - k := "" - if key != nil { - k = *key + timeStamp := time.Now() + if !msg.TimeStamp.IsZero() { + timeStamp = msg.TimeStamp + } + doneFunc := func(ctx context.Context) {} + if msg.DoneFunc != nil { + doneFunc = msg.DoneFunc + } + var val []byte + if msg.Value != nil { + val = msg.Value + } + if msg.ValueData != nil { + //nolint:errcheck // To simplify this helper function's api, we'll suppress marshalling errors. + val, _ = msg.Fmt.Marshall(msg.ValueData) } return &Message{ - Key: k, - Headers: nil, - value: value, - fmt: fmt, - doneFunc: doneFunc, - doneOnce: sync.Once{}, - TimeStamp: time.Now(), + Key: key, + isKeyNil: msg.Key == nil, + Headers: msg.Headers, + Offset: msg.Offset, + Partition: msg.Partition, + Topic: msg.Topic, + GroupID: msg.GroupID, + TimeStamp: timeStamp, + value: val, + topicPartition: kafka.TopicPartition{ + Topic: &msg.Topic, + Partition: msg.Partition, + Offset: kafka.Offset(msg.Offset), + }, + fmt: msg.Fmt, + doneFunc: doneFunc, + doneOnce: sync.Once{}, } } diff --git a/testhelper_test.go b/testhelper_test.go index a955511..14d82e9 100644 --- a/testhelper_test.go +++ b/testhelper_test.go @@ -7,10 +7,10 @@ import ( "github.com/zillow/zfmt" ) -func TestGetFakeMessage(t *testing.T) { +func Test_getFakeMessage(t *testing.T) { defer recoverThenFail(t) - msg := GetFakeMessage("key", "value", &zfmt.JSONFormatter{}, nil) + msg := getFakeMessage("key", "value", &zfmt.JSONFormatter{}, nil) expectedMessage := Message{ Key: "key", value: []byte("\"value\""), @@ -19,13 +19,17 @@ func TestGetFakeMessage(t *testing.T) { require.Equal(t, string(expectedMessage.value), string(msg.value), "Expected generated zkafka.Message to use value from arg") } -func TestGetFakeMessageFromRaw(t *testing.T) { +func TestGetFakeMessageFromFake(t *testing.T) { defer recoverThenFail(t) fmtr := &zfmt.JSONFormatter{} val, err := fmtr.Marshall("value") require.NoError(t, err) - msg := GetFakeMsgFromRaw(ptr("key"), val, fmtr, nil) + msg := GetMsgFromFake(&FakeMessage{ + Key: ptr("key"), + Value: val, + Fmt: fmtr, + }) expectedMessage := Message{ Key: "key", value: []byte("\"value\""), @@ -34,10 +38,13 @@ func TestGetFakeMessageFromRaw(t *testing.T) { require.Equal(t, string(expectedMessage.value), string(msg.value), "Expected generated zkafka.Message to use value from arg") } -func TestGetFakeMessage_WhenMarshallError(t *testing.T) { - +func TestMsgFromFake_WhenMarshallError(t *testing.T) { // pass in some invalid object for marshalling - msg := GetFakeMessage("key", make(chan int), &zfmt.JSONFormatter{}, nil) + msg := GetMsgFromFake(&FakeMessage{ + Key: ptr("key"), + ValueData: make(chan int), + Fmt: &zfmt.JSONFormatter{}, + }) expectedMessage := Message{ Key: "key", value: nil, diff --git a/writer.go b/writer.go index 8e33e12..3558f91 100644 --- a/writer.go +++ b/writer.go @@ -99,7 +99,7 @@ func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...W // as the value for the kafka message // It's convenient for forwarding message in dead letter operations. func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error) { - kafkaMessage := makeProducerMessageRaw(ctx, w.topicConfig.ClientID, w.topicConfig.Topic, key, value) + kafkaMessage := makeProducerMessageRaw(ctx, w.topicConfig.Topic, key, value) for _, opt := range opts { opt.apply(&kafkaMessage) }