diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 2993330..dc34ced 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -3,17 +3,68 @@ package observability import ( "fmt" "io" + "strconv" "time" + + "github.com/in4it/wireguard-server/pkg/logging" ) -func (o *Observability) WriteBufferToStorage() error { - file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T02T15:04:05")) +func (o *Observability) WriteBufferToStorage(n int64) error { + o.BufferMu.Lock() + defer o.BufferMu.Unlock() + file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } - _, err = io.Copy(file, &o.Buffer) + _, err = io.CopyN(file, &o.Buffer, n) if err != nil { return fmt.Errorf("file write error: %s", err) } + o.LastFlushed = time.Now() return file.Close() } + +func (o *Observability) monitorBuffer() { + for { + time.Sleep(FLUSH_TIME_MAX_MINUTES * time.Minute) + if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES * time.Minute) { + if o.FlushOverflow.CompareAndSwap(false, true) { + err := o.WriteBufferToStorage(int64(o.Buffer.Len())) + o.FlushOverflow.Swap(true) + if err != nil { + logging.ErrorLog(fmt.Errorf("write log buffer to storage error: %s", err)) + continue + } + } + o.LastFlushed = time.Now() + } + } +} + +func (o *Observability) Ingest(data io.ReadCloser) error { + defer data.Close() + msgs, err := Decode(data) + if err != nil { + return fmt.Errorf("decode error: %s", err) + } + _, err = o.Buffer.Write(encodeMessage(msgs)) + if err != nil { + return fmt.Errorf("write error: %s", err) + + } + fmt.Printf("Buffer size: %d\n", o.Buffer.Len()) + if o.Buffer.Len() >= MAX_BUFFER_SIZE { + if o.FlushOverflow.CompareAndSwap(false, true) { + go func() { // write to storage + if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)) + } + } + o.FlushOverflow.Swap(false) + }() + } + } + return nil +} diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go new file mode 100644 index 0000000..1b84004 --- /dev/null +++ b/pkg/observability/buffer_test.go @@ -0,0 +1,76 @@ +package observability + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "strconv" + "testing" + "time" + + memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory" +) + +func TestIngestion(t *testing.T) { + t.Skip() // working on this test + storage := &memorystorage.MockMemoryStorage{} + o := &Observability{ + Storage: storage, + } + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string: ", + }, + } + + for i := 0; i < MAX_BUFFER_SIZE; i++ { + payload[0]["log"] = "this is string: " + strconv.Itoa(i) + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal error: %s", err) + } + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err = o.Ingest(data) + if err != nil { + t.Fatalf("ingest error: %s", err) + } + } + + // flush remaining data + time.Sleep(1 * time.Second) + if o.Buffer.Len() >= MAX_BUFFER_SIZE { + if o.FlushOverflow.CompareAndSwap(false, true) { + if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } + } + o.FlushOverflow.Swap(false) + } + } + + dirlist, err := storage.ReadDir("") + if err != nil { + t.Fatalf("read dir error: %s", err) + } + + totalMessages := 0 + for _, file := range dirlist { + messages, err := storage.ReadFile(file) + if err != nil { + t.Fatalf("read file error: %s", err) + } + decodedMessages := decodeMessage(messages) + for _, message := range decodedMessages { + fmt.Printf("decoded message: %s\n", message.Data["log"]) + } + totalMessages += len(decodedMessages) + } + fmt.Printf("totalmessages: %d", totalMessages) + if len(dirlist) != 3 { + t.Fatalf("expected 3 files in directory, got %d", len(dirlist)) + } +} diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go new file mode 100644 index 0000000..1fcbb16 --- /dev/null +++ b/pkg/observability/constants.go @@ -0,0 +1,4 @@ +package observability + +const MAX_BUFFER_SIZE = 100 +const FLUSH_TIME_MAX_MINUTES = 5 diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index 2e7b20d..b557bde 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -14,17 +14,10 @@ func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusBadRequest) return } - msgs, err := Decode(r.Body) - if err != nil { + if err := o.Ingest(r.Body); err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Printf("error: %s", err) return } - _, err = o.Buffer.Write(encodeMessage(msgs)) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Printf("cannot store message: %s", err) - return - } w.WriteHeader(http.StatusOK) } diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index a46c64e..880a883 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -35,7 +35,7 @@ func TestIngestionHandler(t *testing.T) { t.Fatalf("expected status code OK. Got: %d", res.StatusCode) } - err = o.WriteBufferToStorage() + err = o.WriteBufferToStorage(int64(o.Buffer.Len())) if err != nil { t.Fatalf("write buffer to storage error: %s", err) } diff --git a/pkg/observability/new.go b/pkg/observability/new.go index 184b68b..ae7dee6 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -3,7 +3,13 @@ package observability import "net/http" func New() *Observability { - return &Observability{} + o := &Observability{} + go o.monitorBuffer() + return o +} +func NewWithoutMonitor() *Observability { + o := &Observability{} + return o } type Iface interface { diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 2b83935..be85ef7 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -2,6 +2,9 @@ package observability import ( "bytes" + "sync" + "sync/atomic" + "time" "github.com/in4it/wireguard-server/pkg/storage" ) @@ -14,6 +17,10 @@ type FluentBitMessage struct { } type Observability struct { - Storage storage.Iface - Buffer bytes.Buffer + Storage storage.Iface + Buffer bytes.Buffer + LastFlushed time.Time + BufferMu sync.Mutex + FlushOverflow atomic.Bool + FlushOverflowSequence atomic.Uint64 }