Skip to content

Commit

Permalink
Stop writer closing
Browse files Browse the repository at this point in the history
  • Loading branch information
ale8k committed Dec 1, 2021
1 parent 9413178 commit eb82b66
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
3 changes: 2 additions & 1 deletion internal/question_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type QuestionProcessor struct {
KafkaWriter kafka.Writer
KafkaWriter *kafka.Writer
Tags string
CollectEvery time.Duration
}
Expand Down Expand Up @@ -95,6 +95,7 @@ func (qp *QuestionProcessor) ProcessAskedQuestions(done chan struct{}) {

// Wrapper to process
processQuestionsToKafka := func() {

data, _ := runQuery()
parsedMessages := make(chan kafka.Message, len(data))
// TODO: Make batching size configurable alex
Expand Down
3 changes: 2 additions & 1 deletion internal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var Server *http.Server
func processorWrapper(topicname string, tags string) {
done := make(chan struct{})
phpQp := &QuestionProcessor{
KafkaWriter: kafka.Writer{
// Seems to close otherwise...?
KafkaWriter: &kafka.Writer{
Addr: kafka.TCP("kafka:9092"),
Topic: topicname,
Balancer: &kafka.LeastBytes{},
Expand Down

0 comments on commit eb82b66

Please sign in to comment.