diff --git a/.gitignore b/.gitignore index 6b401df..986e774 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,6 @@ go.work.sum .idea/ .run/ zk-multiple-kafka-multiple/ -*.out *.res *.lsif *.prof diff --git a/changelog.md b/changelog.md index e1d69eb..7b62247 100644 --- a/changelog.md +++ b/changelog.md @@ -6,7 +6,7 @@ This project adheres to Semantic Versioning. ## 1.1.0 (Sep 22, 2024) -1. Added support for schemaregistry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry` +1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry` 2. Added lifecycle function `LifecyclePostReadImmediate` 3. Added `workFactory.CreateWithFunc` which is a convenience work factory method for creating work using a callback instead of an interface (can reduce boilerplate) in some scenarios. 4. During the creation of readers/writers an error is now returned if bootstrap servers is empty diff --git a/example/worker_avro/main.go b/example/worker_avro/main.go index 8547625..66c185e 100644 --- a/example/worker_avro/main.go +++ b/example/worker_avro/main.go @@ -13,9 +13,6 @@ import ( "github.com/zillow/zkafka" ) -//go:embed dummy_event.avsc -var dummyEventSchema string - // Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly func main() { ctx := context.Background() diff --git a/formatter.go b/formatter.go index ce77959..a8b55d2 100644 --- a/formatter.go +++ b/formatter.go @@ -82,12 +82,12 @@ func (f zfmtShim) unmarshal(req unmarshReq) error { type errFormatter struct{} // marshall returns error with reminder -func (f errFormatter) marshall(req marshReq) ([]byte, error) { +func (f errFormatter) marshall(_ marshReq) ([]byte, error) { return nil, errMissingFormatter } // unmarshal returns error with reminder -func (f errFormatter) unmarshal(req unmarshReq) error { +func (f errFormatter) unmarshal(_ unmarshReq) error { return errMissingFormatter } diff --git a/heap_test.go b/heap_test.go index 2149110..36d7313 100644 --- a/heap_test.go +++ b/heap_test.go @@ -106,7 +106,7 @@ func Test_offsetHeap_SeekPop_DoesntImpactHeapOrdering(t *testing.T) { } got := heap.Pop() want := offsets[i] - require.Equal(t, want, got, "Expect pop to still pop minumums even after seek pops") + require.Equal(t, want, got, "Expect pop to still pop minimums even after seek pops") i++ } } diff --git a/lifecycle.go b/lifecycle.go index 86156e2..44bd69e 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -57,7 +57,7 @@ type LifecycleHooks struct { // Called by work after reading a message (guaranteed non nil), offers the ability to customize the context object (resulting context object passed to work processor) PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error) - // Called by work immediatedly after an attempt to read a message. Msg might be nil, if there was an error + // Called by work immediately after an attempt to read a message. Msg might be nil, if there was an error // or no available messages. PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta) diff --git a/test/integration_test.go b/test/integration_test.go index c591de9..40552f4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -918,6 +918,7 @@ func Test_KafkaClientsCanWriteToTheirDeadLetterTopic(t *testing.T) { Topic: topic, Formatter: zfmt.JSONFmt, }) + require.NoError(t, err) consumerTopicConfig := zkafka.ConsumerTopicConfig{ ClientID: fmt.Sprintf("worker-%s-%s", t.Name(), uuid.NewString()), diff --git a/testhelper.go b/testhelper.go index c2a2040..a65e6b4 100644 --- a/testhelper.go +++ b/testhelper.go @@ -100,11 +100,11 @@ type FakeClient struct { W Writer } -func (f FakeClient) Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) { +func (f FakeClient) Reader(_ context.Context, _ ConsumerTopicConfig, _ ...ReaderOption) (Reader, error) { return f.R, nil } -func (f FakeClient) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) { +func (f FakeClient) Writer(_ context.Context, _ ProducerTopicConfig, _ ...WriterOption) (Writer, error) { return f.W, nil } diff --git a/work.go b/work.go index ded5107..cf08523 100644 --- a/work.go +++ b/work.go @@ -172,7 +172,7 @@ func (w *Work) execProcessors(ctx context.Context, shutdown <-chan struct{}) { // initiateProcessors creates a buffered channel for each virtual partition, of size poolSize. That way // a particular virtual partition never blocks because of its own capacity issue (and instead the goroutinepool is used -// to limit indefinte growth of processing goroutines). +// to limit indefinite growth of processing goroutines). func (w *Work) initiateProcessors(_ context.Context) { poolSize := w.getPoolSize() w.virtualPartitions = make([]chan workUnit, poolSize)