From ecb476779dbab031a6780a8cab58cebaa640af6c Mon Sep 17 00:00:00 2001 From: mirwaisx <57108408+mirwaisx@users.noreply.github.com> Date: Tue, 11 Jul 2023 15:42:10 +0200 Subject: [PATCH] auto topic creation enabled --- kq/pusher.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kq/pusher.go b/kq/pusher.go index f2c02fe..3956486 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -27,10 +27,13 @@ type ( func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { producer := &kafka.Writer{ - Addr: kafka.TCP(addrs...), - Topic: topic, + Addr: kafka.TCP(addrs...), + Topic: topic, + //todo move the follwoing to config kpusherConfig? Balancer: &kafka.LeastBytes{}, Compression: kafka.Snappy, + //if this is not set, the writer will not create a nonexistent topic + AllowAutoTopicCreation: true, } pusher := &Pusher{ produer: producer, @@ -53,7 +56,7 @@ func (p *Pusher) Close() error { if p.executor != nil { p.executor.Flush() } - + return p.produer.Close() }