diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 4aa0bab..db338f5 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -4,10 +4,13 @@ import ( "bytes" "fmt" "io" + "path" "strconv" + "strings" "time" "github.com/in4it/wireguard-server/pkg/logging" + "github.com/in4it/wireguard-server/pkg/storage" ) func (o *Observability) WriteBufferToStorage(n int64) error { @@ -21,7 +24,12 @@ func (o *Observability) WriteBufferToStorage(n int64) error { return fmt.Errorf("write error from buffer to temporary buffer: %s", err) } now := time.Now() - file, err := o.Storage.OpenFileForWriting(now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) + filename := now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10) + err = ensurePath(o.Storage, filename) + if err != nil { + return fmt.Errorf("ensure path error: %s", err) + } + file, err := o.Storage.OpenFileForWriting(filename) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } @@ -29,6 +37,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { if err != nil { return fmt.Errorf("file write error: %s", err) } + logging.DebugLog(fmt.Errorf("wrote file: %s", filename)) return file.Close() } @@ -91,3 +100,17 @@ func (c *ConcurrentRWBuffer) Len() int { func (c *ConcurrentRWBuffer) Cap() int { return c.buffer.Cap() } + +func ensurePath(storage storage.Iface, filename string) error { + base := path.Dir(filename) + baseSplit := strings.Split(base, "/") + fullPath := "" + for _, v := range baseSplit { + fullPath = path.Join(fullPath, v) + err := storage.EnsurePath(fullPath) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index 0f545d6..f515dc1 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -74,8 +74,7 @@ func TestIngestionMoreMessages(t *testing.T) { t.Skip() // we can skip this for general unit testing totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) - o.Storage = storage + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) payload := IncomingData{ { "date": 1720613813.197045, @@ -134,8 +133,7 @@ func TestIngestionMoreMessages(t *testing.T) { func BenchmarkIngest10000000(b *testing.B) { totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) - o.Storage = storage + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) payload := IncomingData{ { "date": 1720613813.197045, @@ -170,7 +168,7 @@ func BenchmarkIngest10000000(b *testing.B) { func BenchmarkIngest100000000(b *testing.B) { totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) o.Storage = storage payload := IncomingData{ { @@ -202,3 +200,11 @@ func BenchmarkIngest100000000(b *testing.B) { } } } + +func TestEnsurePath(t *testing.T) { + storage := &memorystorage.MockMemoryStorage{} + err := ensurePath(storage, "a/b/c/filename.txt") + if err != nil { + t.Fatalf("error: %s", err) + } +} diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go index 2d63170..1696cbd 100644 --- a/pkg/observability/constants.go +++ b/pkg/observability/constants.go @@ -1,6 +1,6 @@ package observability const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB -const FLUSH_TIME_MAX_MINUTES = 5 +const FLUSH_TIME_MAX_MINUTES = 1 // should have 5 as default at release const TIMESTAMP_FORMAT = "2006-01-02T15:04:05" diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go index 66436bb..e63cf23 100644 --- a/pkg/observability/logs.go +++ b/pkg/observability/logs.go @@ -8,10 +8,17 @@ import ( ) func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) { - logEntryResponse := LogEntryResponse{} + logEntryResponse := LogEntryResponse{ + Enabled: true, + Environments: []string{"dev", "qa", "prod"}, + } logFiles := []string{} + if maxLogLines == 0 { + maxLogLines = 100 + } + for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) { fileList, err := o.Storage.ReadDir(d.Format("2006/01/02")) if err != nil { diff --git a/pkg/observability/new.go b/pkg/observability/new.go index e375212..7bf9f0b 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -6,8 +6,8 @@ import ( "github.com/in4it/wireguard-server/pkg/storage" ) -func New(storage storage.Iface) *Observability { - o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) +func New(defaultStorage storage.Iface) *Observability { + o := NewWithoutMonitor(defaultStorage, MAX_BUFFER_SIZE) go o.monitorBuffer() return o } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 099dff8..ba83131 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -32,7 +32,9 @@ type ConcurrentRWBuffer struct { } type LogEntryResponse struct { - LogEntries []LogEntry `json:"logEntries"` + Enabled bool `json:"enabled"` + LogEntries []LogEntry `json:"logEntries"` + Environments []string `json:"environments"` } type LogEntry struct {