Skip to content

Commit

Permalink
Merge pull request #169 from practo/max-buffer-fix
Browse files Browse the repository at this point in the history
Max buffer should be equal to the concurrency set
  • Loading branch information
alok87 authored Mar 19, 2021
2 parents 6071ad3 + fe26f4a commit 0f16d43
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion redshiftsink/pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (b *batchProcessor) Process(
msgBufs := [][]*serializer.Message{}

klog.V(2).Infof(
"%s: buffchan:%v msgs",
"%s: processChan:%v",
b.topic,
len(processChan),
)
Expand Down
2 changes: 1 addition & 1 deletion redshiftsink/pkg/redshiftbatcher/batcher_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *batcherHandler) ConsumeClaim(
)

var lastSchemaId *int
processChan := make(chan []*serializer.Message, 1000)
processChan := make(chan []*serializer.Message, h.maxConcurrency)
errChan := make(chan error)
processor := newBatchProcessor(
h.consumerGroupID,
Expand Down
2 changes: 1 addition & 1 deletion redshiftsink/pkg/serializer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *MessageAsyncBatch) Flush() {
b.processChan <- b.msgBuf
b.msgBuf = make([]*Message, 0, b.maxSize)
klog.V(4).Infof(
"%s: flushed:%d, buffchan:%v msgs",
"%s: flushed:%d, processChan:%v",
b.topic,
size,
len(b.processChan),
Expand Down

0 comments on commit 0f16d43

Please sign in to comment.