Skip to content

Commit

Permalink
make buffer thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 13, 2024
1 parent 33c4c4f commit 8e02424
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 37 deletions.
39 changes: 31 additions & 8 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package observability

import (
"bytes"
"fmt"
"io"
"strconv"
Expand All @@ -10,17 +11,24 @@ import (
)

func (o *Observability) WriteBufferToStorage(n int64) error {
o.BufferMu.Lock()
defer o.BufferMu.Unlock()
o.ActiveBufferWriters.Add(1)
defer o.ActiveBufferWriters.Done()
// copy first to temporary buffer (storage might have latency)
tempBuf := bytes.NewBuffer(make([]byte, n))
_, err := io.CopyN(tempBuf, o.Buffer, n)
o.LastFlushed = time.Now()
if err != nil && err != io.EOF {
return fmt.Errorf("write error from buffer to temporary buffer: %s", err)
}

file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10))
if err != nil {
return fmt.Errorf("open file for writing error: %s", err)
}
_, err = io.CopyN(file, &o.Buffer, n)
_, err = io.Copy(file, tempBuf)
if err != nil {
return fmt.Errorf("file write error: %s", err)
}
o.LastFlushed = time.Now()
return file.Close()
}

Expand Down Expand Up @@ -50,13 +58,11 @@ func (o *Observability) Ingest(data io.ReadCloser) error {
_, err = o.Buffer.Write(encodeMessage(msgs))
if err != nil {
return fmt.Errorf("write error: %s", err)

}
fmt.Printf("Buffer size: %d\n", o.Buffer.Len())
if o.Buffer.Len() >= MAX_BUFFER_SIZE {
if o.Buffer.Len() >= o.MaxBufferSize {
if o.FlushOverflow.CompareAndSwap(false, true) {
go func() { // write to storage
if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE {
if n := o.Buffer.Len(); n >= o.MaxBufferSize {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err))
Expand All @@ -68,3 +74,20 @@ func (o *Observability) Ingest(data io.ReadCloser) error {
}
return nil
}

func (c *ConcurrentRWBuffer) Write(p []byte) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.buffer.Write(p)
}
func (c *ConcurrentRWBuffer) Read(p []byte) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.buffer.Read(p)
}
func (c *ConcurrentRWBuffer) Len() int {
return c.buffer.Len()
}
func (c *ConcurrentRWBuffer) Cap() int {
return c.buffer.Cap()
}
171 changes: 149 additions & 22 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,23 @@ import (
"io"
"strconv"
"testing"
"time"

memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory"
)

func TestIngestion(t *testing.T) {
t.Skip() // working on this test
totalMessagesToGenerate := 1000
storage := &memorystorage.MockMemoryStorage{}
o := &Observability{
Storage: storage,
}
o := NewWithoutMonitor(20)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string: ",
},
}

for i := 0; i < MAX_BUFFER_SIZE; i++ {
for i := 0; i < totalMessagesToGenerate; i++ {
payload[0]["log"] = "this is string: " + strconv.Itoa(i)
payloadBytes, err := json.Marshal(payload)
if err != nil {
Expand All @@ -38,17 +36,14 @@ func TestIngestion(t *testing.T) {
}
}

// flush remaining data
time.Sleep(1 * time.Second)
if o.Buffer.Len() >= MAX_BUFFER_SIZE {
if o.FlushOverflow.CompareAndSwap(false, true) {
if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
o.FlushOverflow.Swap(false)
// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}

Expand All @@ -64,13 +59,145 @@ func TestIngestion(t *testing.T) {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
for _, message := range decodedMessages {
fmt.Printf("decoded message: %s\n", message.Data["log"])
totalMessages += len(decodedMessages)
}
if len(dirlist) == 0 {
t.Fatalf("expected multiple files in directory, got %d", len(dirlist))
}

if totalMessages != totalMessagesToGenerate {
t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages)
}
}

func TestIngestionMoreMessages(t *testing.T) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string: ",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
t.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}

dirlist, err := storage.ReadDir("")
if err != nil {
t.Fatalf("read dir error: %s", err)
}

totalMessages := 0
for _, file := range dirlist {
messages, err := storage.ReadFile(file)
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
totalMessages += len(decodedMessages)
}
fmt.Printf("totalmessages: %d", totalMessages)
if len(dirlist) != 3 {
t.Fatalf("expected 3 files in directory, got %d", len(dirlist))
if len(dirlist) == 0 {
t.Fatalf("expected multiple files in directory, got %d", len(dirlist))
}

if totalMessages != totalMessagesToGenerate {
t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages)
}
fmt.Printf("Buffer size (read+unread): %d in %d files\n", o.Buffer.Cap(), len(dirlist))

}

func BenchmarkIngest10000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
b.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
b.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
}

func BenchmarkIngest100000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
b.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
b.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
}
2 changes: 1 addition & 1 deletion pkg/observability/constants.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package observability

const MAX_BUFFER_SIZE = 100
const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB
const FLUSH_TIME_MAX_MINUTES = 5
13 changes: 9 additions & 4 deletions pkg/observability/new.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package observability

import "net/http"
import (
"net/http"
)

func New() *Observability {
o := &Observability{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
go o.monitorBuffer()
return o
}
func NewWithoutMonitor() *Observability {
o := &Observability{}
func NewWithoutMonitor(maxBufferSize int) *Observability {
o := &Observability{
Buffer: &ConcurrentRWBuffer{},
MaxBufferSize: maxBufferSize,
}
return o
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/observability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ type FluentBitMessage struct {

type Observability struct {
Storage storage.Iface
Buffer bytes.Buffer
Buffer *ConcurrentRWBuffer
LastFlushed time.Time
BufferMu sync.Mutex
FlushOverflow atomic.Bool
FlushOverflowSequence atomic.Uint64
ActiveBufferWriters sync.WaitGroup
MaxBufferSize int
}

type ConcurrentRWBuffer struct {
buffer bytes.Buffer
mu sync.Mutex
}

0 comments on commit 8e02424

Please sign in to comment.