Skip to content

Commit

Permalink
Merge pull request #1 from se0wtf/master
Browse files Browse the repository at this point in the history
Fix Timeout duration & proper use of timer.Reset()
  • Loading branch information
se0wtf authored May 1, 2020
2 parents 246bb24 + 37b1142 commit 0af2f8b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
27 changes: 15 additions & 12 deletions kafka-sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,22 +327,19 @@ func (s *Syncer) IndexTopic(kafka sarama.Client, index diff.Index) (msgCount uin
}
}

// report kafka errors
go func() {
// FIXME fail on first error
for m := range pc.Errors() {
if Debug {
log.Printf("-> got kafka error %+v", m)
}
}
}()

timer := time.NewTimer(kafka.Config().Consumer.MaxProcessingTime)
// as written in sarama doc :
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChannelBufferSize).
// real timeout is 100ms * 256 = around 2.5s, 100ms is not enough
timeout := kafka.Config().Consumer.MaxProcessingTime * time.Duration(kafka.Config().ChannelBufferSize)
timer := time.NewTimer(timeout)
defer timer.Stop()
msgCount = 0
consume:
for {
select {
case err := <-pc.Errors():
log.Printf("-> got kafka error %+v", err)
case m := <-pc.Messages():
{
hw := pc.HighWaterMarkOffset()
Expand Down Expand Up @@ -377,7 +374,13 @@ consume:
consumer.Close()
return msgCount, errors.New("timed out while waiting for kafka message")
}
timer.Reset(kafka.Config().Consumer.MaxProcessingTime)

// as written in the timer.Reset() doc :
// Reset should be invoked only on stopped or expired timers with drained channels.
if !timer.Stop() {
<- timer.C
}
timer.Reset(timeout)
}

pc.Close()
Expand Down
3 changes: 0 additions & 3 deletions modd.conf

This file was deleted.

0 comments on commit 0af2f8b

Please sign in to comment.