Skip to content

Commit

Permalink
Corrected some typos
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 23, 2024
1 parent 7f11505 commit f0923ca
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 12 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go.work.sum
.idea/
.run/
zk-multiple-kafka-multiple/
*.out
*.res
*.lsif
*.prof
Expand Down
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This project adheres to Semantic Versioning.

## 1.1.0 (Sep 22, 2024)

1. Added support for schemaregistry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
1. Added support for schema registry (avro, proto, json). Extended `zfmt.FormatterType` types to include `avro_schema_registry`, `proto_schema_registry` and `json_schema_registry`
2. Added lifecycle function `LifecyclePostReadImmediate`
3. Added `workFactory.CreateWithFunc` which is a convenience work factory method for creating work using a callback instead of an interface (can reduce boilerplate) in some scenarios.
4. During the creation of readers/writers an error is now returned if bootstrap servers is empty
Expand Down
3 changes: 0 additions & 3 deletions example/worker_avro/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
"github.com/zillow/zkafka"
)

//go:embed dummy_event.avsc
var dummyEventSchema string

// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly
func main() {
ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func (f zfmtShim) unmarshal(req unmarshReq) error {
type errFormatter struct{}

// marshall returns error with reminder
func (f errFormatter) marshall(req marshReq) ([]byte, error) {
func (f errFormatter) marshall(_ marshReq) ([]byte, error) {
return nil, errMissingFormatter
}

// unmarshal returns error with reminder
func (f errFormatter) unmarshal(req unmarshReq) error {
func (f errFormatter) unmarshal(_ unmarshReq) error {
return errMissingFormatter
}

Expand Down
2 changes: 1 addition & 1 deletion heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Test_offsetHeap_SeekPop_DoesntImpactHeapOrdering(t *testing.T) {
}
got := heap.Pop()
want := offsets[i]
require.Equal(t, want, got, "Expect pop to still pop minumums even after seek pops")
require.Equal(t, want, got, "Expect pop to still pop minimums even after seek pops")
i++
}
}
2 changes: 1 addition & 1 deletion lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type LifecycleHooks struct {
// Called by work after reading a message (guaranteed non nil), offers the ability to customize the context object (resulting context object passed to work processor)
PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error)

// Called by work immediatedly after an attempt to read a message. Msg might be nil, if there was an error
// Called by work immediately after an attempt to read a message. Msg might be nil, if there was an error
// or no available messages.
PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta)

Expand Down
1 change: 1 addition & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ func Test_KafkaClientsCanWriteToTheirDeadLetterTopic(t *testing.T) {
Topic: topic,
Formatter: zfmt.JSONFmt,
})
require.NoError(t, err)

consumerTopicConfig := zkafka.ConsumerTopicConfig{
ClientID: fmt.Sprintf("worker-%s-%s", t.Name(), uuid.NewString()),
Expand Down
4 changes: 2 additions & 2 deletions testhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ type FakeClient struct {
W Writer
}

func (f FakeClient) Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error) {
func (f FakeClient) Reader(_ context.Context, _ ConsumerTopicConfig, _ ...ReaderOption) (Reader, error) {
return f.R, nil
}

func (f FakeClient) Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error) {
func (f FakeClient) Writer(_ context.Context, _ ProducerTopicConfig, _ ...WriterOption) (Writer, error) {
return f.W, nil
}

Expand Down
2 changes: 1 addition & 1 deletion work.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (w *Work) execProcessors(ctx context.Context, shutdown <-chan struct{}) {

// initiateProcessors creates a buffered channel for each virtual partition, of size poolSize. That way
// a particular virtual partition never blocks because of its own capacity issue (and instead the goroutinepool is used
// to limit indefinte growth of processing goroutines).
// to limit indefinite growth of processing goroutines).
func (w *Work) initiateProcessors(_ context.Context) {
poolSize := w.getPoolSize()
w.virtualPartitions = make([]chan workUnit, poolSize)
Expand Down

0 comments on commit f0923ca

Please sign in to comment.