Skip to content

Commit

Permalink
Lumberjack: add parallel flow processing (#77)
Browse files Browse the repository at this point in the history
* lumberjack: added support for optical decimal divider for options batchsize and queuesize

* lumberjack: implemented multiple backend goroutines (count config parameter)

---------

Co-authored-by: Heiko Reese <[email protected]>
  • Loading branch information
hreese and Heiko Reese authored Aug 3, 2023
1 parent c5af4ff commit 02e9b61
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 67 deletions.
13 changes: 13 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,19 @@ no encryption), `tls://` (TLS encryption) or `tlsnoverify://` (TLS encryption wi
certificate verification). The schema is followed by the hostname or IP address, a colon `:`,
and a port number. IPv6 addresses must be surrounded by square brackets.

A goroutine is spawned for every lumberjack server. Each goroutine only uses one CPU core to
process and send flows. This may not be enough when the ingress flow rate is high and/or a high compression
level is used. The number of goroutines per backend can by set explicitly with the `?count=x` URL
parameter. For example:

```yaml
config:
server: tls://host1:5043/?count=4, tls://host2:5043/?compression=9&count=16
```

will use four parallel goroutines for `host1` and sixteen parallel goroutines for `host2`. Use `&count=…` instead of
`?count=…` when `count` is not the first parameter (standard URI convention).

Transport compression is disabled by default. Use `compression` to set the compression level
for all hosts. Compression levels can vary between 0 (no compression) and 9 (maximum compression).
To set per-host transport compression adding `?compression=<level>` to the server URI.
Expand Down
157 changes: 90 additions & 67 deletions segments/output/lumberjack/lumberjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/bwNetFlow/flowpipeline/segments"
"log"
"net/url"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -25,6 +26,7 @@ type ServerOptions struct {
UseTLS bool
VerifyCertificate bool
CompressionLevel int
Parallism int
}

type Lumberjack struct {
Expand Down Expand Up @@ -113,18 +115,37 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment {
}
}

// parse count url argument
var numRoutines = 1
numRoutinesString := urlQueryParams.Get("count")
if numRoutinesString == "" {
numRoutines = 1
} else {
numRoutines, err = strconv.Atoi(numRoutinesString)
switch {
case err != nil:
log.Fatalf("[error] Lumberjack: Failed to parse count %s for host %s: %s", numRoutinesString, serverURL.Host, err)
case numRoutines < 1:
log.Printf("[warning] Lumberjack: count is smaller than 1, setting to 1")
numRoutines = 1
case numRoutines > runtime.NumCPU():
log.Printf("[warning] Lumberjack: count is larger than runtime.NumCPU (%d). This will most likely hurt performance.", runtime.NumCPU())
}
}

segment.Servers[serverURL.Host] = ServerOptions{
UseTLS: useTLS,
VerifyCertificate: verifyTLS,
CompressionLevel: compressionLevel,
Parallism: numRoutines,
}
}
}

// parse batchSize option
segment.BatchSize = defaultBatchSize
if config["batchsize"] != "" {
segment.BatchSize, err = strconv.Atoi(config["batchsize"])
segment.BatchSize, err = strconv.Atoi(strings.ReplaceAll(config["batchsize"], "_", ""))
if err != nil {
log.Fatalf("[error] Lumberjack: Failed to parse batchsize config option: %s", err)
}
Expand Down Expand Up @@ -183,7 +204,7 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment {

// create buffered channel
if config["queuesize"] != "" {
buflen, err = strconv.Atoi(config["queuesize"])
buflen, err = strconv.Atoi(strings.ReplaceAll(config["queuesize"], "_", ""))
if err != nil {
log.Fatalf("[error] Lumberjack: Failed to parse queuesize config option: %s", err)
}
Expand Down Expand Up @@ -224,80 +245,82 @@ func (segment *Lumberjack) Run(wg *sync.WaitGroup) {
for server, options := range segment.Servers {
writerWG.Add(1)
options := options
go func(server string) {
// connect to lumberjack server
client := NewResilientClient(server, options, segment.ReconnectWait)
defer client.Close()
log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v, Compression: %d)", server, options.UseTLS, options.VerifyCertificate, options.CompressionLevel)

flowInterface := make([]interface{}, segment.BatchSize)
idx := 0

// see https://stackoverflow.com/questions/66037676/go-reset-a-timer-newtimer-within-select-loop for timer mechanics
timer := time.NewTimer(segment.BatchTimeout)
timer.Stop()
defer timer.Stop()
var timerSet bool

for {
select {
case flow, isOpen := <-segment.LumberjackOut:
// exit on closed channel
if !isOpen {
// send local buffer
count, err := client.SendNoRetry(flowInterface[:idx])
if err != nil {
log.Printf("[error] Lumberjack: Failed to send final flow batch upon exit to %s: %s", server, err)
} else {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent final batch (%d)", server, count)
for i := 0; i < options.Parallism; i++ {
go func(server string, numServer int) {
// connect to lumberjack server
client := NewResilientClient(server, options, segment.ReconnectWait)
defer client.Close()
log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v, Compression: %d, number %d/%d)", server, options.UseTLS, options.VerifyCertificate, options.CompressionLevel, numServer+1, options.Parallism)

flowInterface := make([]interface{}, segment.BatchSize)
idx := 0

// see https://stackoverflow.com/questions/66037676/go-reset-a-timer-newtimer-within-select-loop for timer mechanics
timer := time.NewTimer(segment.BatchTimeout)
timer.Stop()
defer timer.Stop()
var timerSet bool

for {
select {
case flow, isOpen := <-segment.LumberjackOut:
// exit on closed channel
if !isOpen {
// send local buffer
count, err := client.SendNoRetry(flowInterface[:idx])
if err != nil {
log.Printf("[error] Lumberjack: Failed to send final flow batch upon exit to %s: %s", server, err)
} else {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent final batch (%d)", server, count)
}
wg.Done()
return
}
wg.Done()
return
}

// append flow to batch
flowInterface[idx] = flow
idx++

// send batch if full
if idx == segment.BatchSize {
// We got an event, and timer was already set.
// We need to stop the timer and drain the channel if needed,
// so that we can safely reset it later.
if timerSet {
if !timer.Stop() {
<-timer.C
// append flow to batch
flowInterface[idx] = flow
idx++

// send batch if full
if idx == segment.BatchSize {
// We got an event, and timer was already set.
// We need to stop the timer and drain the channel if needed,
// so that we can safely reset it later.
if timerSet {
if !timer.Stop() {
<-timer.C
}
timerSet = false
}
timerSet = false
}

client.Send(flowInterface)
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent full batch (%d)", server, segment.BatchSize)
client.Send(flowInterface)
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent full batch (%d)", server, segment.BatchSize)

// reset idx
idx = 0
// reset idx
idx = 0

// If timer was not set, or it was stopped before, it's safe to reset it.
if !timerSet {
timerSet = true
timer.Reset(segment.BatchTimeout)
// If timer was not set, or it was stopped before, it's safe to reset it.
if !timerSet {
timerSet = true
timer.Reset(segment.BatchTimeout)
}
}
case <-timer.C:
// timer expired, send batch
if idx > 0 {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sending incomplete batch (%d/%d)", server, idx, segment.BatchSize)
client.Send(flowInterface[:idx])
idx = 0
} else {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Timer expired with empty batch", server)
}
}
case <-timer.C:
// timer expired, send batch
if idx > 0 {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Sending incomplete batch (%d/%d)", server, idx, segment.BatchSize)
client.Send(flowInterface[:idx])
idx = 0
} else {
segment.BatchDebugPrintf("[debug] Lumberjack: %s Timer expired with empty batch", server)
}

timer.Reset(segment.BatchTimeout)
timerSet = true
timer.Reset(segment.BatchTimeout)
timerSet = true
}
}
}
}(server)
}(server, i)
}
}

// forward flows to lumberjack servers and to the next segment
Expand Down

0 comments on commit 02e9b61

Please sign in to comment.