Skip to content

Commit

Permalink
buffer atomic operations
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 13, 2024
1 parent 25c1690 commit 33c4c4f
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 15 deletions.
57 changes: 54 additions & 3 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,68 @@ package observability
import (
"fmt"
"io"
"strconv"
"time"

"github.com/in4it/wireguard-server/pkg/logging"
)

func (o *Observability) WriteBufferToStorage() error {
file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T02T15:04:05"))
func (o *Observability) WriteBufferToStorage(n int64) error {
o.BufferMu.Lock()
defer o.BufferMu.Unlock()
file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10))
if err != nil {
return fmt.Errorf("open file for writing error: %s", err)
}
_, err = io.Copy(file, &o.Buffer)
_, err = io.CopyN(file, &o.Buffer, n)
if err != nil {
return fmt.Errorf("file write error: %s", err)
}
o.LastFlushed = time.Now()
return file.Close()
}

func (o *Observability) monitorBuffer() {
for {
time.Sleep(FLUSH_TIME_MAX_MINUTES * time.Minute)
if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES * time.Minute) {
if o.FlushOverflow.CompareAndSwap(false, true) {
err := o.WriteBufferToStorage(int64(o.Buffer.Len()))
o.FlushOverflow.Swap(true)
if err != nil {
logging.ErrorLog(fmt.Errorf("write log buffer to storage error: %s", err))
continue
}
}
o.LastFlushed = time.Now()
}
}
}

func (o *Observability) Ingest(data io.ReadCloser) error {
defer data.Close()
msgs, err := Decode(data)
if err != nil {
return fmt.Errorf("decode error: %s", err)
}
_, err = o.Buffer.Write(encodeMessage(msgs))
if err != nil {
return fmt.Errorf("write error: %s", err)

}
fmt.Printf("Buffer size: %d\n", o.Buffer.Len())
if o.Buffer.Len() >= MAX_BUFFER_SIZE {
if o.FlushOverflow.CompareAndSwap(false, true) {
go func() { // write to storage
if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err))
}
}
o.FlushOverflow.Swap(false)
}()
}
}
return nil
}
76 changes: 76 additions & 0 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package observability

import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"testing"
"time"

memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory"
)

func TestIngestion(t *testing.T) {
t.Skip() // working on this test
storage := &memorystorage.MockMemoryStorage{}
o := &Observability{
Storage: storage,
}
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string: ",
},
}

for i := 0; i < MAX_BUFFER_SIZE; i++ {
payload[0]["log"] = "this is string: " + strconv.Itoa(i)
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal error: %s", err)
}
data := io.NopCloser(bytes.NewReader(payloadBytes))
err = o.Ingest(data)
if err != nil {
t.Fatalf("ingest error: %s", err)
}
}

// flush remaining data
time.Sleep(1 * time.Second)
if o.Buffer.Len() >= MAX_BUFFER_SIZE {
if o.FlushOverflow.CompareAndSwap(false, true) {
if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
o.FlushOverflow.Swap(false)
}
}

dirlist, err := storage.ReadDir("")
if err != nil {
t.Fatalf("read dir error: %s", err)
}

totalMessages := 0
for _, file := range dirlist {
messages, err := storage.ReadFile(file)
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
for _, message := range decodedMessages {
fmt.Printf("decoded message: %s\n", message.Data["log"])
}
totalMessages += len(decodedMessages)
}
fmt.Printf("totalmessages: %d", totalMessages)
if len(dirlist) != 3 {
t.Fatalf("expected 3 files in directory, got %d", len(dirlist))
}
}
4 changes: 4 additions & 0 deletions pkg/observability/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package observability

const MAX_BUFFER_SIZE = 100
const FLUSH_TIME_MAX_MINUTES = 5
9 changes: 1 addition & 8 deletions pkg/observability/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@ func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusBadRequest)
return
}
msgs, err := Decode(r.Body)
if err != nil {
if err := o.Ingest(r.Body); err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("error: %s", err)
return
}
_, err = o.Buffer.Write(encodeMessage(msgs))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("cannot store message: %s", err)
return
}
w.WriteHeader(http.StatusOK)
}
2 changes: 1 addition & 1 deletion pkg/observability/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestIngestionHandler(t *testing.T) {
t.Fatalf("expected status code OK. Got: %d", res.StatusCode)
}

err = o.WriteBufferToStorage()
err = o.WriteBufferToStorage(int64(o.Buffer.Len()))
if err != nil {
t.Fatalf("write buffer to storage error: %s", err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/observability/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package observability
import "net/http"

func New() *Observability {
return &Observability{}
o := &Observability{}
go o.monitorBuffer()
return o
}
func NewWithoutMonitor() *Observability {
o := &Observability{}
return o
}

type Iface interface {
Expand Down
11 changes: 9 additions & 2 deletions pkg/observability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package observability

import (
"bytes"
"sync"
"sync/atomic"
"time"

"github.com/in4it/wireguard-server/pkg/storage"
)
Expand All @@ -14,6 +17,10 @@ type FluentBitMessage struct {
}

type Observability struct {
Storage storage.Iface
Buffer bytes.Buffer
Storage storage.Iface
Buffer bytes.Buffer
LastFlushed time.Time
BufferMu sync.Mutex
FlushOverflow atomic.Bool
FlushOverflowSequence atomic.Uint64
}

0 comments on commit 33c4c4f

Please sign in to comment.