Skip to content

Latest commit

 

History

History
69 lines (56 loc) · 2.31 KB

README.md

File metadata and controls

69 lines (56 loc) · 2.31 KB

bwNetFlow Go Kafka Connector

This is a opinionated implementation of a common Connector module for all of our official components and optionally for users of our platform that intend to write client applications in Go. It provides an abstraction for plain Sarama and has support for consuming topics as well as producing to multiple topics, all while converting any message according to our protobuf definition for Flow messages (which is based on goflow's definition).

Build Status Go Report Card GoDoc

Example Usage in Consumer-only mode:

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/Shopify/sarama"
	kafka "github.com/bwNetFlow/kafkaconnector"
)

var kafkaConn = kafka.Connector{}

func main() {
	fmt.Printf("welcome... let's go!\n")

	// prepare all variables
	broker := "127.0.0.1:9092,[::1]:9092" // TODO: set valid uris
	topic := []string{"flow-messages-anon"}
	consumerGroup := "anon-golang-example"
	kafkaConn.SetAuthAnon() // optionally: change to SetAuthFromEnv() or SetAuth(user string, pass string)

	kafkaConn.EnablePrometheus(":2112") // optionally open up for monitoring

	// ensure a clean exit
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigchan
		fmt.Println("Signal caught, exiting...")
		kafkaConn.Close()
	}()

	// receive flows
	kafkaConn.StartConsumer(broker, topic, consumerGroup, sarama.OffsetNewest)
	var flowCounter, byteCounter uint64
	for flow := range kafkaConn.ConsumerChannel() {
		// process the flow here ...
		flowCounter++
		byteCounter += flow.GetBytes()
		fmt.Printf("\rflows: %d, bytes: %d GB", flowCounter, byteCounter/1024/1024/1024)
	}
}

Example Usage in Consumer/Producer mode:

Check out processor_splitter, it is very simple and consumes a single topic while producing to multiple target topics.