diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 07b63de..791165a 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -14,7 +14,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { o.ActiveBufferWriters.Add(1) defer o.ActiveBufferWriters.Done() // copy first to temporary buffer (storage might have latency) - tempBuf := bytes.NewBuffer(make([]byte, n)) + tempBuf := bytes.NewBuffer(make([]byte, 0, n)) _, err := io.CopyN(tempBuf, o.Buffer, n) o.LastFlushed = time.Now() if err != nil && err != io.EOF { diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 880a883..006cb5c 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -12,9 +12,8 @@ import ( func TestIngestionHandler(t *testing.T) { storage := &memorystorage.MockMemoryStorage{} - o := &Observability{ - Storage: storage, - } + o := NewWithoutMonitor(20) + o.Storage = storage payload := IncomingData{ { "date": 1720613813.197045, @@ -35,10 +34,17 @@ func TestIngestionHandler(t *testing.T) { t.Fatalf("expected status code OK. Got: %d", res.StatusCode) } - err = o.WriteBufferToStorage(int64(o.Buffer.Len())) - if err != nil { - t.Fatalf("write buffer to storage error: %s", err) + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } } + dirlist, err := storage.ReadDir("") if err != nil { t.Fatalf("read dir error: %s", err) @@ -52,7 +58,7 @@ func TestIngestionHandler(t *testing.T) { } decodedMessages := decodeMessage(messages) if decodedMessages[0].Date != 1720613813.197045 { - t.Fatalf("unexpected date") + t.Fatalf("unexpected date. Got %f, expected: %f", decodedMessages[0].Date, 1720613813.197045) } if decodedMessages[0].Data["log"] != "this is a string" { t.Fatalf("unexpected log data")