From 7de0b71a9a056268a936bff28d83c7e06dd40dee Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Thu, 12 Sep 2024 17:03:52 -0500 Subject: [PATCH 01/16] observability --- go.mod | 19 +++++++++++ go.sum | 40 ++++++++++++++++++++++ pkg/observability/buffer.go | 19 +++++++++++ pkg/observability/decoding.go | 51 +++++++++++++++++++++++++++- pkg/observability/decoding_test.go | 46 +++++++++++++++++++++++-- pkg/observability/encoding.go | 24 +++++++++++++ pkg/observability/handlers.go | 11 +++++- pkg/observability/handlers_test.go | 54 ++++++++++++++++++++++++++++++ pkg/observability/new.go | 3 -- pkg/observability/types.go | 17 ++++++++-- pkg/storage/memory/storage.go | 4 ++- pkg/storage/s3/list.go | 25 ++++++++++++++ pkg/storage/s3/new.go | 23 +++++++++++++ pkg/storage/s3/path.go | 42 +++++++++++++++++++++++ pkg/storage/s3/read.go | 17 ++++++++++ pkg/storage/s3/types.go | 11 ++++++ pkg/storage/s3/write.go | 35 +++++++++++++++++++ 17 files changed, 430 insertions(+), 11 deletions(-) create mode 100644 pkg/observability/buffer.go create mode 100644 pkg/observability/encoding.go create mode 100644 pkg/observability/handlers_test.go create mode 100644 pkg/storage/s3/list.go create mode 100644 pkg/storage/s3/new.go create mode 100644 pkg/storage/s3/path.go create mode 100644 pkg/storage/s3/read.go create mode 100644 pkg/storage/s3/types.go create mode 100644 pkg/storage/s3/write.go diff --git a/go.mod b/go.mod index a4cf984..440af15 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,27 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/config v1.27.33 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.32 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 // indirect + github.com/aws/smithy-go v1.20.4 // indirect github.com/beevik/etree v1.4.0 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect github.com/josharian/native v1.1.0 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect diff --git a/go.sum b/go.sum index e3965de..2664f51 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,39 @@ +github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g= +github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= +github.com/aws/aws-sdk-go-v2/config v1.27.33 h1:Nof9o/MsmH4oa0s2q9a0k7tMz5x/Yj5k06lDODWz3BU= +github.com/aws/aws-sdk-go-v2/config v1.27.33/go.mod h1:kEqdYzRb8dd8Sy2pOdEbExTTF5v7ozEXX0McgPE7xks= +github.com/aws/aws-sdk-go-v2/credentials v1.17.32 h1:7Cxhp/BnT2RcGy4VisJ9miUPecY+lyE9I8JvcZofn9I= +github.com/aws/aws-sdk-go-v2/credentials v1.17.32/go.mod h1:P5/QMF3/DCHbXGEGkdbilXHsyTBX5D3HSwcrSc9p20I= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 h1:Roo69qTpfu8OlJ2Tb7pAYVuF0CpuUMB0IYWwYP/4DZM= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17/go.mod h1:NcWPxQzGM1USQggaTVwz6VpqMZPX1CvDJLDh6jnOCa4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 h1:FLMkfEiRjhgeDTCjjLoc3URo/TBkgeQbocA78lfkzSI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19/go.mod h1:Vx+GucNSsdhaxs3aZIKfSUjKVGsxN25nX2SRcdhuw08= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsdzgl7ZL2KlXiUAoJnI/VxfHCvDFr2QDFj6u4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 h1:u+EfGmksnJc/x5tq3A+OD7LrMbSSR/5TrKLvkdy/fhY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17/go.mod h1:VaMx6302JHax2vHJWgRo+5n9zvbacs3bLU/23DNQrTY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 h1:Kp6PWAlXwP1UvIflkIP6MFZYBNDCa4mFCGtxrpICVOg= +github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.7 h1:pIaGg+08llrP7Q5aiz9ICWbY8cqhTkyy+0SHvfzQpTc= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.7/go.mod h1:eEygMHnTKH/3kNp9Jr1n3PdejuSNcgwLe1dWgQtO0VQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 h1:/Cfdu0XV3mONYKaOt1Gr0k1KvQzkzPyiKUdlWJqy+J4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7/go.mod h1:bCbAxKDqNvkHxRaIMnyVPXPo+OaPRwvmgzMxbz1VKSA= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 h1:NKTa1eqZYw8tiHSRGpP0VtTdub/8KNk8sDkNPFaOKDE= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.7/go.mod h1:NXi1dIAGteSaRLqYgarlhP/Ij0cFT+qmCwiJqWh/U5o= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/beevik/etree v1.4.0 h1:oz1UedHRepuY3p4N5OjE0nK1WLCqtzHf25bxplKOHLs= github.com/beevik/etree v1.4.0/go.mod h1:cyWiXwGoasx60gHvtnEh5x8+uIjUVnjWqBvEnhnqKDA= @@ -15,6 +51,9 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopacket/gopacket v1.3.0 h1:MouZCc+ej0vnqzB0WeiaO/6+tGvb+KU7UczxoQ+X0Yc= github.com/gopacket/gopacket v1.3.0/go.mod h1:WnFrU1Xkf5lWKV38uKNR9+yYtppn+ZYzOyNqMeH4oNE= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= @@ -71,6 +110,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go new file mode 100644 index 0000000..2993330 --- /dev/null +++ b/pkg/observability/buffer.go @@ -0,0 +1,19 @@ +package observability + +import ( + "fmt" + "io" + "time" +) + +func (o *Observability) WriteBufferToStorage() error { + file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T02T15:04:05")) + if err != nil { + return fmt.Errorf("open file for writing error: %s", err) + } + _, err = io.Copy(file, &o.Buffer) + if err != nil { + return fmt.Errorf("file write error: %s", err) + } + return file.Close() +} diff --git a/pkg/observability/decoding.go b/pkg/observability/decoding.go index f166e66..8aaf6db 100644 --- a/pkg/observability/decoding.go +++ b/pkg/observability/decoding.go @@ -1,9 +1,13 @@ package observability import ( + "encoding/binary" "encoding/json" "fmt" "io" + "math" + "reflect" + "strconv" ) func Decode(r io.Reader) ([]FluentBitMessage, error) { @@ -22,11 +26,23 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { switch m2 := m1[0].(type) { case map[string]interface{}: var fluentBitMessage FluentBitMessage + fluentBitMessage.Data = make(map[string]string) val, ok := m2["date"] if ok { fluentBitMessage.Date = val.(float64) } - fluentBitMessage.Data = m2 + for key, value := range m2 { + switch valueTyped := value.(type) { + case string: + fluentBitMessage.Data[key] = valueTyped + case float64: + fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) + case []byte: + fluentBitMessage.Data[key] = string(valueTyped) + default: + fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + } + } result = append(result, fluentBitMessage) default: return result, fmt.Errorf("invalid type: no map found in array") @@ -36,3 +52,36 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { } return result, nil } + +func decodeMessage(msgs []byte) []FluentBitMessage { + res := []FluentBitMessage{} + recordOffset := 0 + for k := 0; k < len(msgs); k++ { + if k > recordOffset+8 && msgs[k] == 0xff && msgs[k-1] == 0xff { + bits := binary.LittleEndian.Uint64(msgs[recordOffset : recordOffset+8]) + msg := FluentBitMessage{ + Date: math.Float64frombits(bits), + Data: map[string]string{}, + } + isKey := false + key := "" + start := 8 + recordOffset + for kk := 8 + recordOffset; kk < k; kk++ { + if msgs[kk] == 0xff { + if isKey { + isKey = false + msg.Data[key] = string(msgs[recordOffset+start+1 : recordOffset+kk]) + start = kk + 1 + } else { + isKey = true + key = string(msgs[recordOffset+start : recordOffset+kk]) + start = kk + } + } + } + res = append(res, msg) + recordOffset = k + 1 + } + } + return res +} diff --git a/pkg/observability/decoding_test.go b/pkg/observability/decoding_test.go index 152ef7e..6d0c108 100644 --- a/pkg/observability/decoding_test.go +++ b/pkg/observability/decoding_test.go @@ -2,11 +2,12 @@ package observability import ( "bytes" + "fmt" "testing" ) func TestDecoding(t *testing.T) { - data := `[{"date":1720613813.197045,"rand_value":5523152494216581654}]` + data := `[{"date":1720613813.197045,"rand_value":"rand"}]` messages, err := Decode(bytes.NewBuffer([]byte(data))) if err != nil { t.Fatalf("error: %s", err) @@ -21,7 +22,46 @@ func TestDecoding(t *testing.T) { if !ok { t.Fatalf("rand_value key not found") } - if val.(float64) != 5523152494216581654 { - t.Fatalf("wrong data returned") + if string(val) != "rand" { + t.Fatalf("wrong data returned: %s", val) + } +} + +func TestDecodeMsg(t *testing.T) { + msgs := []FluentBitMessage{ + { + Date: 1720613813.197045, + Data: map[string]string{ + "mykey": "this is myvalue", + "second key": "this is my second value", + "third key": "this is my third value", + }, + }, + /*{ + Date: 1720613813.197099, + Data: map[string]string{ + "second data set": "my value", + }, + },*/ + } + encoded := encodeMessage(msgs) + decoded := decodeMessage(encoded) + fmt.Printf("decoded: %+v\n", decoded) + + if len(msgs) != len(decoded) { + t.Fatalf("length doesn't match") + } + for k := range decoded { + if msgs[k].Date != decoded[k].Date { + t.Fatalf("date doesn't match") + } + if len(msgs[k].Data) != len(decoded[k].Data) { + t.Fatalf("length of data doesn't match") + } + for kk := range decoded[k].Data { + if msgs[k].Data[kk] != decoded[k].Data[kk] { + t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, msgs[k].Data[kk], decoded[k].Data[kk]) + } + } } } diff --git a/pkg/observability/encoding.go b/pkg/observability/encoding.go new file mode 100644 index 0000000..c805b8f --- /dev/null +++ b/pkg/observability/encoding.go @@ -0,0 +1,24 @@ +package observability + +import ( + "bytes" + "encoding/binary" + "math" +) + +func encodeMessage(msgs []FluentBitMessage) []byte { + out := bytes.Buffer{} + for _, msg := range msgs { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(msg.Date)) + out.Write(buf[:]) + for key, msgData := range msg.Data { + out.Write([]byte(key)) + out.Write([]byte{0xff}) + out.Write([]byte(msgData)) + out.Write([]byte{0xff}) + } + out.Write([]byte{0xff}) + } + return out.Bytes() +} diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index defd71d..2e7b20d 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -10,12 +10,21 @@ func (o *Observability) observabilityHandler(w http.ResponseWriter, r *http.Requ } func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusBadRequest) + return + } msgs, err := Decode(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Printf("error: %s", err) return } - fmt.Printf("Got msgs: %+v\n", msgs) + _, err = o.Buffer.Write(encodeMessage(msgs)) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Printf("cannot store message: %s", err) + return + } w.WriteHeader(http.StatusOK) } diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go new file mode 100644 index 0000000..719386d --- /dev/null +++ b/pkg/observability/handlers_test.go @@ -0,0 +1,54 @@ +package observability + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory" +) + +func TestIngestionHandler(t *testing.T) { + storage := &memorystorage.MockMemoryStorage{} + o := &Observability{ + Storage: storage, + } + payload := IncomingData{ + { + "Date": 1720613813.197045, + "log": "this is a string", + }, + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal error: %s", err) + } + req := httptest.NewRequest(http.MethodPost, "/api/observability/ingestion/json", bytes.NewReader(payloadBytes)) + w := httptest.NewRecorder() + o.ingestionHandler(w, req) + res := w.Result() + + if res.StatusCode != http.StatusOK { + t.Fatalf("expected status code OK. Got: %d", res.StatusCode) + } + + err = o.WriteBufferToStorage() + if err != nil { + t.Fatalf("write buffer to storage error: %s", err) + } + dirlist, err := storage.ReadDir("") + if err != nil { + t.Fatalf("read dir error: %s", err) + } + for _, filename := range dirlist { + filenameOut, err := storage.ReadFile(filename) + if err != nil { + t.Fatalf("read file error: %s", err) + } + fmt.Printf("filenameOut: %s", filenameOut) + } +} diff --git a/pkg/observability/new.go b/pkg/observability/new.go index bcf3147..184b68b 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -6,9 +6,6 @@ func New() *Observability { return &Observability{} } -type Observability struct { -} - type Iface interface { GetRouter() *http.ServeMux } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 3190430..2b83935 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -1,6 +1,19 @@ package observability +import ( + "bytes" + + "github.com/in4it/wireguard-server/pkg/storage" +) + +type IncomingData []map[string]any + type FluentBitMessage struct { - Date float64 `json:"date"` - Data map[string]any `json:"data"` + Date float64 `json:"date"` + Data map[string]string `json:"data"` +} + +type Observability struct { + Storage storage.Iface + Buffer bytes.Buffer } diff --git a/pkg/storage/memory/storage.go b/pkg/storage/memory/storage.go index ef5734e..c18b790 100644 --- a/pkg/storage/memory/storage.go +++ b/pkg/storage/memory/storage.go @@ -97,7 +97,9 @@ func (m *MockMemoryStorage) ReadDir(path string) ([]string, error) { } res := []string{} for k := range m.Data { - if strings.HasPrefix(k, path+"/") { + if path == "" { + res = append(res, strings.ReplaceAll(k, path+"/", "")) + } else if strings.HasPrefix(k, path+"/") { res = append(res, strings.ReplaceAll(k, path+"/", "")) } } diff --git a/pkg/storage/s3/list.go b/pkg/storage/s3/list.go new file mode 100644 index 0000000..5cadfdc --- /dev/null +++ b/pkg/storage/s3/list.go @@ -0,0 +1,25 @@ +package s3storage + +import ( + "context" + "fmt" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func (s *S3Storage) ReadDir(pathname string) ([]string, error) { + objectList, err := s.s3Client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{ + Bucket: aws.String(s.bucketname), + Prefix: aws.String(s.prefix + "/" + strings.TrimLeft(pathname, "/")), + }) + if err != nil { + return []string{}, fmt.Errorf("list object error: %s", err) + } + res := make([]string, len(objectList.Contents)) + for k, object := range objectList.Contents { + res[k] = *object.Key + } + return res, nil +} diff --git a/pkg/storage/s3/new.go b/pkg/storage/s3/new.go new file mode 100644 index 0000000..7e0c7f7 --- /dev/null +++ b/pkg/storage/s3/new.go @@ -0,0 +1,23 @@ +package s3storage + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func New(bucketname, prefix string) (*S3Storage, error) { + sdkConfig, err := config.LoadDefaultConfig(context.TODO()) + if err != nil { + return nil, fmt.Errorf("config load error: %s", err) + } + s3Client := s3.NewFromConfig(sdkConfig) + + return &S3Storage{ + bucketname: bucketname, + prefix: prefix, + s3Client: s3Client, + }, nil +} diff --git a/pkg/storage/s3/path.go b/pkg/storage/s3/path.go new file mode 100644 index 0000000..b8db5ce --- /dev/null +++ b/pkg/storage/s3/path.go @@ -0,0 +1,42 @@ +package s3storage + +import ( + "io/fs" + "strings" +) + +func (l *S3Storage) FileExists(filename string) bool { + return false +} + +func (l *S3Storage) ConfigPath(filename string) string { + return CONFIG_PATH + "/" + strings.TrimLeft(filename, "/") +} + +func (s *S3Storage) GetPath() string { + return s.prefix +} + +func (l *S3Storage) EnsurePath(pathname string) error { + return nil +} + +func (l *S3Storage) EnsureOwnership(filename, login string) error { + return nil +} + +func (l *S3Storage) Remove(name string) error { + return nil +} + +func (l *S3Storage) Rename(oldName, newName string) error { + return nil +} + +func (l *S3Storage) EnsurePermissions(name string, mode fs.FileMode) error { + return nil +} + +func (l *S3Storage) FileInfo(name string) (fs.FileInfo, error) { + return nil, nil +} diff --git a/pkg/storage/s3/read.go b/pkg/storage/s3/read.go new file mode 100644 index 0000000..91b0ffe --- /dev/null +++ b/pkg/storage/s3/read.go @@ -0,0 +1,17 @@ +package s3storage + +import ( + "io" +) + +func (l *S3Storage) ReadFile(name string) ([]byte, error) { + return nil, nil +} + +func (l *S3Storage) OpenFilesFromPos(names []string, pos int64) ([]io.ReadCloser, error) { + return nil, nil +} + +func (l *S3Storage) OpenFile(name string) (io.ReadCloser, error) { + return nil, nil +} diff --git a/pkg/storage/s3/types.go b/pkg/storage/s3/types.go new file mode 100644 index 0000000..3ff10eb --- /dev/null +++ b/pkg/storage/s3/types.go @@ -0,0 +1,11 @@ +package s3storage + +import "github.com/aws/aws-sdk-go-v2/service/s3" + +const CONFIG_PATH = "config" + +type S3Storage struct { + bucketname string + prefix string + s3Client *s3.Client +} diff --git a/pkg/storage/s3/write.go b/pkg/storage/s3/write.go new file mode 100644 index 0000000..815b819 --- /dev/null +++ b/pkg/storage/s3/write.go @@ -0,0 +1,35 @@ +package s3storage + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func (s *S3Storage) WriteFile(name string, data []byte) error { + _, err := s.s3Client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(s.bucketname), + Key: aws.String(name), + Body: bytes.NewReader(data), + }) + if err != nil { + return fmt.Errorf("put object error: %s", err) + } + return nil +} + +func (s *S3Storage) AppendFile(name string, data []byte) error { + return nil +} + +func (s *S3Storage) OpenFileForWriting(name string) (io.WriteCloser, error) { + return nil, nil +} + +func (s *S3Storage) OpenFileForAppending(name string) (io.WriteCloser, error) { + return nil, nil +} From 25c169046e3901818bce57af00b7a6e622fd94e8 Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Thu, 12 Sep 2024 22:06:05 -0500 Subject: [PATCH 02/16] decoding fixes --- pkg/observability/decoding.go | 26 ++++++++++++++------------ pkg/observability/decoding_test.go | 20 ++++++++++++++------ pkg/observability/handlers_test.go | 22 ++++++++++++++-------- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/pkg/observability/decoding.go b/pkg/observability/decoding.go index 8aaf6db..5339645 100644 --- a/pkg/observability/decoding.go +++ b/pkg/observability/decoding.go @@ -32,15 +32,17 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { fluentBitMessage.Date = val.(float64) } for key, value := range m2 { - switch valueTyped := value.(type) { - case string: - fluentBitMessage.Data[key] = valueTyped - case float64: - fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) - case []byte: - fluentBitMessage.Data[key] = string(valueTyped) - default: - fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + if key != "date" { + switch valueTyped := value.(type) { + case string: + fluentBitMessage.Data[key] = valueTyped + case float64: + fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) + case []byte: + fluentBitMessage.Data[key] = string(valueTyped) + default: + fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + } } } result = append(result, fluentBitMessage) @@ -66,15 +68,15 @@ func decodeMessage(msgs []byte) []FluentBitMessage { isKey := false key := "" start := 8 + recordOffset - for kk := 8 + recordOffset; kk < k; kk++ { + for kk := start; kk < k; kk++ { if msgs[kk] == 0xff { if isKey { isKey = false - msg.Data[key] = string(msgs[recordOffset+start+1 : recordOffset+kk]) + msg.Data[key] = string(msgs[start+1 : kk]) start = kk + 1 } else { isKey = true - key = string(msgs[recordOffset+start : recordOffset+kk]) + key = string(msgs[start:kk]) start = kk } } diff --git a/pkg/observability/decoding_test.go b/pkg/observability/decoding_test.go index 6d0c108..f506488 100644 --- a/pkg/observability/decoding_test.go +++ b/pkg/observability/decoding_test.go @@ -2,13 +2,22 @@ package observability import ( "bytes" - "fmt" + "encoding/json" "testing" ) func TestDecoding(t *testing.T) { - data := `[{"date":1720613813.197045,"rand_value":"rand"}]` - messages, err := Decode(bytes.NewBuffer([]byte(data))) + payload := IncomingData{ + { + "date": 1720613813.197045, + "rand_value": "rand", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("json marshal error: %s", err) + } + messages, err := Decode(bytes.NewBuffer(payloadBytes)) if err != nil { t.Fatalf("error: %s", err) } @@ -37,16 +46,15 @@ func TestDecodeMsg(t *testing.T) { "third key": "this is my third value", }, }, - /*{ + { Date: 1720613813.197099, Data: map[string]string{ "second data set": "my value", }, - },*/ + }, } encoded := encodeMessage(msgs) decoded := decodeMessage(encoded) - fmt.Printf("decoded: %+v\n", decoded) if len(msgs) != len(decoded) { t.Fatalf("length doesn't match") diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 719386d..a46c64e 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -3,7 +3,6 @@ package observability import ( "bytes" "encoding/json" - "fmt" "net/http" "net/http/httptest" "testing" @@ -18,7 +17,7 @@ func TestIngestionHandler(t *testing.T) { } payload := IncomingData{ { - "Date": 1720613813.197045, + "date": 1720613813.197045, "log": "this is a string", }, } @@ -44,11 +43,18 @@ func TestIngestionHandler(t *testing.T) { if err != nil { t.Fatalf("read dir error: %s", err) } - for _, filename := range dirlist { - filenameOut, err := storage.ReadFile(filename) - if err != nil { - t.Fatalf("read file error: %s", err) - } - fmt.Printf("filenameOut: %s", filenameOut) + if len(dirlist) == 0 { + t.Fatalf("dir is empty") + } + messages, err := storage.ReadFile(dirlist[0]) + if err != nil { + t.Fatalf("read file error: %s", err) + } + decodedMessages := decodeMessage(messages) + if decodedMessages[0].Date != 1720613813.197045 { + t.Fatalf("unexpected date") + } + if decodedMessages[0].Data["log"] != "this is a string" { + t.Fatalf("unexpected log data") } } From 33c4c4fbf36e1fcd08e88073343be67fc41d7064 Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Fri, 13 Sep 2024 11:30:48 -0500 Subject: [PATCH 03/16] buffer atomic operations --- pkg/observability/buffer.go | 57 ++++++++++++++++++++-- pkg/observability/buffer_test.go | 76 ++++++++++++++++++++++++++++++ pkg/observability/constants.go | 4 ++ pkg/observability/handlers.go | 9 +--- pkg/observability/handlers_test.go | 2 +- pkg/observability/new.go | 8 +++- pkg/observability/types.go | 11 ++++- 7 files changed, 152 insertions(+), 15 deletions(-) create mode 100644 pkg/observability/buffer_test.go create mode 100644 pkg/observability/constants.go diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 2993330..dc34ced 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -3,17 +3,68 @@ package observability import ( "fmt" "io" + "strconv" "time" + + "github.com/in4it/wireguard-server/pkg/logging" ) -func (o *Observability) WriteBufferToStorage() error { - file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T02T15:04:05")) +func (o *Observability) WriteBufferToStorage(n int64) error { + o.BufferMu.Lock() + defer o.BufferMu.Unlock() + file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } - _, err = io.Copy(file, &o.Buffer) + _, err = io.CopyN(file, &o.Buffer, n) if err != nil { return fmt.Errorf("file write error: %s", err) } + o.LastFlushed = time.Now() return file.Close() } + +func (o *Observability) monitorBuffer() { + for { + time.Sleep(FLUSH_TIME_MAX_MINUTES * time.Minute) + if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES * time.Minute) { + if o.FlushOverflow.CompareAndSwap(false, true) { + err := o.WriteBufferToStorage(int64(o.Buffer.Len())) + o.FlushOverflow.Swap(true) + if err != nil { + logging.ErrorLog(fmt.Errorf("write log buffer to storage error: %s", err)) + continue + } + } + o.LastFlushed = time.Now() + } + } +} + +func (o *Observability) Ingest(data io.ReadCloser) error { + defer data.Close() + msgs, err := Decode(data) + if err != nil { + return fmt.Errorf("decode error: %s", err) + } + _, err = o.Buffer.Write(encodeMessage(msgs)) + if err != nil { + return fmt.Errorf("write error: %s", err) + + } + fmt.Printf("Buffer size: %d\n", o.Buffer.Len()) + if o.Buffer.Len() >= MAX_BUFFER_SIZE { + if o.FlushOverflow.CompareAndSwap(false, true) { + go func() { // write to storage + if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)) + } + } + o.FlushOverflow.Swap(false) + }() + } + } + return nil +} diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go new file mode 100644 index 0000000..1b84004 --- /dev/null +++ b/pkg/observability/buffer_test.go @@ -0,0 +1,76 @@ +package observability + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "strconv" + "testing" + "time" + + memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory" +) + +func TestIngestion(t *testing.T) { + t.Skip() // working on this test + storage := &memorystorage.MockMemoryStorage{} + o := &Observability{ + Storage: storage, + } + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string: ", + }, + } + + for i := 0; i < MAX_BUFFER_SIZE; i++ { + payload[0]["log"] = "this is string: " + strconv.Itoa(i) + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal error: %s", err) + } + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err = o.Ingest(data) + if err != nil { + t.Fatalf("ingest error: %s", err) + } + } + + // flush remaining data + time.Sleep(1 * time.Second) + if o.Buffer.Len() >= MAX_BUFFER_SIZE { + if o.FlushOverflow.CompareAndSwap(false, true) { + if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } + } + o.FlushOverflow.Swap(false) + } + } + + dirlist, err := storage.ReadDir("") + if err != nil { + t.Fatalf("read dir error: %s", err) + } + + totalMessages := 0 + for _, file := range dirlist { + messages, err := storage.ReadFile(file) + if err != nil { + t.Fatalf("read file error: %s", err) + } + decodedMessages := decodeMessage(messages) + for _, message := range decodedMessages { + fmt.Printf("decoded message: %s\n", message.Data["log"]) + } + totalMessages += len(decodedMessages) + } + fmt.Printf("totalmessages: %d", totalMessages) + if len(dirlist) != 3 { + t.Fatalf("expected 3 files in directory, got %d", len(dirlist)) + } +} diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go new file mode 100644 index 0000000..1fcbb16 --- /dev/null +++ b/pkg/observability/constants.go @@ -0,0 +1,4 @@ +package observability + +const MAX_BUFFER_SIZE = 100 +const FLUSH_TIME_MAX_MINUTES = 5 diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index 2e7b20d..b557bde 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -14,17 +14,10 @@ func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusBadRequest) return } - msgs, err := Decode(r.Body) - if err != nil { + if err := o.Ingest(r.Body); err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Printf("error: %s", err) return } - _, err = o.Buffer.Write(encodeMessage(msgs)) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Printf("cannot store message: %s", err) - return - } w.WriteHeader(http.StatusOK) } diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index a46c64e..880a883 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -35,7 +35,7 @@ func TestIngestionHandler(t *testing.T) { t.Fatalf("expected status code OK. Got: %d", res.StatusCode) } - err = o.WriteBufferToStorage() + err = o.WriteBufferToStorage(int64(o.Buffer.Len())) if err != nil { t.Fatalf("write buffer to storage error: %s", err) } diff --git a/pkg/observability/new.go b/pkg/observability/new.go index 184b68b..ae7dee6 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -3,7 +3,13 @@ package observability import "net/http" func New() *Observability { - return &Observability{} + o := &Observability{} + go o.monitorBuffer() + return o +} +func NewWithoutMonitor() *Observability { + o := &Observability{} + return o } type Iface interface { diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 2b83935..be85ef7 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -2,6 +2,9 @@ package observability import ( "bytes" + "sync" + "sync/atomic" + "time" "github.com/in4it/wireguard-server/pkg/storage" ) @@ -14,6 +17,10 @@ type FluentBitMessage struct { } type Observability struct { - Storage storage.Iface - Buffer bytes.Buffer + Storage storage.Iface + Buffer bytes.Buffer + LastFlushed time.Time + BufferMu sync.Mutex + FlushOverflow atomic.Bool + FlushOverflowSequence atomic.Uint64 } From 8e02424292690c802bef99ce772336d7393064ef Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Fri, 13 Sep 2024 14:10:03 -0500 Subject: [PATCH 04/16] make buffer thread safe --- pkg/observability/buffer.go | 39 +++++-- pkg/observability/buffer_test.go | 171 +++++++++++++++++++++++++++---- pkg/observability/constants.go | 2 +- pkg/observability/new.go | 13 ++- pkg/observability/types.go | 10 +- 5 files changed, 198 insertions(+), 37 deletions(-) diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index dc34ced..07b63de 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -1,6 +1,7 @@ package observability import ( + "bytes" "fmt" "io" "strconv" @@ -10,17 +11,24 @@ import ( ) func (o *Observability) WriteBufferToStorage(n int64) error { - o.BufferMu.Lock() - defer o.BufferMu.Unlock() + o.ActiveBufferWriters.Add(1) + defer o.ActiveBufferWriters.Done() + // copy first to temporary buffer (storage might have latency) + tempBuf := bytes.NewBuffer(make([]byte, n)) + _, err := io.CopyN(tempBuf, o.Buffer, n) + o.LastFlushed = time.Now() + if err != nil && err != io.EOF { + return fmt.Errorf("write error from buffer to temporary buffer: %s", err) + } + file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } - _, err = io.CopyN(file, &o.Buffer, n) + _, err = io.Copy(file, tempBuf) if err != nil { return fmt.Errorf("file write error: %s", err) } - o.LastFlushed = time.Now() return file.Close() } @@ -50,13 +58,11 @@ func (o *Observability) Ingest(data io.ReadCloser) error { _, err = o.Buffer.Write(encodeMessage(msgs)) if err != nil { return fmt.Errorf("write error: %s", err) - } - fmt.Printf("Buffer size: %d\n", o.Buffer.Len()) - if o.Buffer.Len() >= MAX_BUFFER_SIZE { + if o.Buffer.Len() >= o.MaxBufferSize { if o.FlushOverflow.CompareAndSwap(false, true) { go func() { // write to storage - if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { + if n := o.Buffer.Len(); n >= o.MaxBufferSize { err := o.WriteBufferToStorage(int64(n)) if err != nil { logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)) @@ -68,3 +74,20 @@ func (o *Observability) Ingest(data io.ReadCloser) error { } return nil } + +func (c *ConcurrentRWBuffer) Write(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.buffer.Write(p) +} +func (c *ConcurrentRWBuffer) Read(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.buffer.Read(p) +} +func (c *ConcurrentRWBuffer) Len() int { + return c.buffer.Len() +} +func (c *ConcurrentRWBuffer) Cap() int { + return c.buffer.Cap() +} diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index 1b84004..b58a4e4 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -7,17 +7,15 @@ import ( "io" "strconv" "testing" - "time" memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory" ) func TestIngestion(t *testing.T) { - t.Skip() // working on this test + totalMessagesToGenerate := 1000 storage := &memorystorage.MockMemoryStorage{} - o := &Observability{ - Storage: storage, - } + o := NewWithoutMonitor(20) + o.Storage = storage payload := IncomingData{ { "date": 1720613813.197045, @@ -25,7 +23,7 @@ func TestIngestion(t *testing.T) { }, } - for i := 0; i < MAX_BUFFER_SIZE; i++ { + for i := 0; i < totalMessagesToGenerate; i++ { payload[0]["log"] = "this is string: " + strconv.Itoa(i) payloadBytes, err := json.Marshal(payload) if err != nil { @@ -38,17 +36,14 @@ func TestIngestion(t *testing.T) { } } - // flush remaining data - time.Sleep(1 * time.Second) - if o.Buffer.Len() >= MAX_BUFFER_SIZE { - if o.FlushOverflow.CompareAndSwap(false, true) { - if n := o.Buffer.Len(); n >= MAX_BUFFER_SIZE { - err := o.WriteBufferToStorage(int64(n)) - if err != nil { - t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) - } - } - o.FlushOverflow.Swap(false) + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) } } @@ -64,13 +59,145 @@ func TestIngestion(t *testing.T) { t.Fatalf("read file error: %s", err) } decodedMessages := decodeMessage(messages) - for _, message := range decodedMessages { - fmt.Printf("decoded message: %s\n", message.Data["log"]) + totalMessages += len(decodedMessages) + } + if len(dirlist) == 0 { + t.Fatalf("expected multiple files in directory, got %d", len(dirlist)) + } + + if totalMessages != totalMessagesToGenerate { + t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages) + } +} + +func TestIngestionMoreMessages(t *testing.T) { + totalMessagesToGenerate := 10000000 // 10,000,000 + storage := &memorystorage.MockMemoryStorage{} + o := NewWithoutMonitor(MAX_BUFFER_SIZE) + o.Storage = storage + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string: ", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal error: %s", err) + } + + for i := 0; i < totalMessagesToGenerate; i++ { + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err := o.Ingest(data) + if err != nil { + t.Fatalf("ingest error: %s", err) + } + } + + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) } + } + + dirlist, err := storage.ReadDir("") + if err != nil { + t.Fatalf("read dir error: %s", err) + } + + totalMessages := 0 + for _, file := range dirlist { + messages, err := storage.ReadFile(file) + if err != nil { + t.Fatalf("read file error: %s", err) + } + decodedMessages := decodeMessage(messages) totalMessages += len(decodedMessages) } - fmt.Printf("totalmessages: %d", totalMessages) - if len(dirlist) != 3 { - t.Fatalf("expected 3 files in directory, got %d", len(dirlist)) + if len(dirlist) == 0 { + t.Fatalf("expected multiple files in directory, got %d", len(dirlist)) + } + + if totalMessages != totalMessagesToGenerate { + t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages) + } + fmt.Printf("Buffer size (read+unread): %d in %d files\n", o.Buffer.Cap(), len(dirlist)) + +} + +func BenchmarkIngest10000000(b *testing.B) { + totalMessagesToGenerate := 10000000 // 10,000,000 + storage := &memorystorage.MockMemoryStorage{} + o := NewWithoutMonitor(MAX_BUFFER_SIZE) + o.Storage = storage + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + b.Fatalf("marshal error: %s", err) + } + + for i := 0; i < totalMessagesToGenerate; i++ { + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err := o.Ingest(data) + if err != nil { + b.Fatalf("ingest error: %s", err) + } + } + + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } + } +} + +func BenchmarkIngest100000000(b *testing.B) { + totalMessagesToGenerate := 10000000 // 10,000,000 + storage := &memorystorage.MockMemoryStorage{} + o := NewWithoutMonitor(MAX_BUFFER_SIZE) + o.Storage = storage + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + b.Fatalf("marshal error: %s", err) + } + + for i := 0; i < totalMessagesToGenerate; i++ { + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err := o.Ingest(data) + if err != nil { + b.Fatalf("ingest error: %s", err) + } + } + + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } } } diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go index 1fcbb16..0427072 100644 --- a/pkg/observability/constants.go +++ b/pkg/observability/constants.go @@ -1,4 +1,4 @@ package observability -const MAX_BUFFER_SIZE = 100 +const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB const FLUSH_TIME_MAX_MINUTES = 5 diff --git a/pkg/observability/new.go b/pkg/observability/new.go index ae7dee6..e630747 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -1,14 +1,19 @@ package observability -import "net/http" +import ( + "net/http" +) func New() *Observability { - o := &Observability{} + o := NewWithoutMonitor(MAX_BUFFER_SIZE) go o.monitorBuffer() return o } -func NewWithoutMonitor() *Observability { - o := &Observability{} +func NewWithoutMonitor(maxBufferSize int) *Observability { + o := &Observability{ + Buffer: &ConcurrentRWBuffer{}, + MaxBufferSize: maxBufferSize, + } return o } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index be85ef7..99a0f42 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -18,9 +18,15 @@ type FluentBitMessage struct { type Observability struct { Storage storage.Iface - Buffer bytes.Buffer + Buffer *ConcurrentRWBuffer LastFlushed time.Time - BufferMu sync.Mutex FlushOverflow atomic.Bool FlushOverflowSequence atomic.Uint64 + ActiveBufferWriters sync.WaitGroup + MaxBufferSize int +} + +type ConcurrentRWBuffer struct { + buffer bytes.Buffer + mu sync.Mutex } From 91888cfe0719750bf4099bdf04e22436d49e8865 Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Fri, 13 Sep 2024 14:23:03 -0500 Subject: [PATCH 05/16] fix test --- pkg/observability/buffer.go | 2 +- pkg/observability/handlers_test.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 07b63de..791165a 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -14,7 +14,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { o.ActiveBufferWriters.Add(1) defer o.ActiveBufferWriters.Done() // copy first to temporary buffer (storage might have latency) - tempBuf := bytes.NewBuffer(make([]byte, n)) + tempBuf := bytes.NewBuffer(make([]byte, 0, n)) _, err := io.CopyN(tempBuf, o.Buffer, n) o.LastFlushed = time.Now() if err != nil && err != io.EOF { diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 880a883..006cb5c 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -12,9 +12,8 @@ import ( func TestIngestionHandler(t *testing.T) { storage := &memorystorage.MockMemoryStorage{} - o := &Observability{ - Storage: storage, - } + o := NewWithoutMonitor(20) + o.Storage = storage payload := IncomingData{ { "date": 1720613813.197045, @@ -35,10 +34,17 @@ func TestIngestionHandler(t *testing.T) { t.Fatalf("expected status code OK. Got: %d", res.StatusCode) } - err = o.WriteBufferToStorage(int64(o.Buffer.Len())) - if err != nil { - t.Fatalf("write buffer to storage error: %s", err) + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } } + dirlist, err := storage.ReadDir("") if err != nil { t.Fatalf("read dir error: %s", err) @@ -52,7 +58,7 @@ func TestIngestionHandler(t *testing.T) { } decodedMessages := decodeMessage(messages) if decodedMessages[0].Date != 1720613813.197045 { - t.Fatalf("unexpected date") + t.Fatalf("unexpected date. Got %f, expected: %f", decodedMessages[0].Date, 1720613813.197045) } if decodedMessages[0].Data["log"] != "this is a string" { t.Fatalf("unexpected log data") From e6c5b613b7b46b93c25bf376e68de42a15ed71c2 Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Mon, 16 Sep 2024 17:41:13 -0500 Subject: [PATCH 06/16] logs endpoint to show logs --- pkg/observability/buffer.go | 4 +- pkg/observability/buffer_test.go | 5 +- pkg/observability/decoding.go | 74 +++++++++++++------- pkg/observability/decoding_test.go | 105 ++++++++++++++++++++++++++++- pkg/observability/handlers.go | 62 +++++++++++++++++ pkg/observability/handlers_test.go | 2 +- pkg/observability/helpers.go | 15 +++++ pkg/observability/logs.go | 59 ++++++++++++++++ pkg/observability/logs_test.go | 60 +++++++++++++++++ pkg/observability/router.go | 1 + pkg/observability/types.go | 9 +++ pkg/storage/memory/storage.go | 42 +++++++++++- 12 files changed, 406 insertions(+), 32 deletions(-) create mode 100644 pkg/observability/helpers.go create mode 100644 pkg/observability/logs.go create mode 100644 pkg/observability/logs_test.go diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 791165a..b415ffb 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -20,8 +20,8 @@ func (o *Observability) WriteBufferToStorage(n int64) error { if err != nil && err != io.EOF { return fmt.Errorf("write error from buffer to temporary buffer: %s", err) } - - file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) + now := time.Now() + file, err := o.Storage.OpenFileForWriting(now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index b58a4e4..b920545 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -58,7 +58,7 @@ func TestIngestion(t *testing.T) { if err != nil { t.Fatalf("read file error: %s", err) } - decodedMessages := decodeMessage(messages) + decodedMessages := decodeMessages(messages) totalMessages += len(decodedMessages) } if len(dirlist) == 0 { @@ -71,6 +71,7 @@ func TestIngestion(t *testing.T) { } func TestIngestionMoreMessages(t *testing.T) { + t.Skip() // we can skip this for general unit testing totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} o := NewWithoutMonitor(MAX_BUFFER_SIZE) @@ -116,7 +117,7 @@ func TestIngestionMoreMessages(t *testing.T) { if err != nil { t.Fatalf("read file error: %s", err) } - decodedMessages := decodeMessage(messages) + decodedMessages := decodeMessages(messages) totalMessages += len(decodedMessages) } if len(dirlist) == 0 { diff --git a/pkg/observability/decoding.go b/pkg/observability/decoding.go index 5339645..1bd4bd1 100644 --- a/pkg/observability/decoding.go +++ b/pkg/observability/decoding.go @@ -55,35 +55,63 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { return result, nil } -func decodeMessage(msgs []byte) []FluentBitMessage { +func decodeMessages(msgs []byte) []FluentBitMessage { res := []FluentBitMessage{} recordOffset := 0 for k := 0; k < len(msgs); k++ { if k > recordOffset+8 && msgs[k] == 0xff && msgs[k-1] == 0xff { - bits := binary.LittleEndian.Uint64(msgs[recordOffset : recordOffset+8]) - msg := FluentBitMessage{ - Date: math.Float64frombits(bits), - Data: map[string]string{}, - } - isKey := false - key := "" - start := 8 + recordOffset - for kk := start; kk < k; kk++ { - if msgs[kk] == 0xff { - if isKey { - isKey = false - msg.Data[key] = string(msgs[start+1 : kk]) - start = kk + 1 - } else { - isKey = true - key = string(msgs[start:kk]) - start = kk - } - } - } - res = append(res, msg) + res = append(res, decodeMessage(msgs[recordOffset:k])) recordOffset = k + 1 } } return res } +func decodeMessage(data []byte) FluentBitMessage { + bits := binary.LittleEndian.Uint64(data[0:8]) + msg := FluentBitMessage{ + Date: math.Float64frombits(bits), + Data: map[string]string{}, + } + isKey := false + key := "" + start := 8 + for kk := start; kk < len(data); kk++ { + if data[kk] == 0xff { + if isKey { + isKey = false + msg.Data[key] = string(data[start+1 : kk]) + start = kk + 1 + } else { + isKey = true + key = string(data[start:kk]) + start = kk + } + } + } + // if last record didn't end with the terminator + if data[len(data)-1] != 0xff { + msg.Data[key] = string(data[start+1:]) + } + return msg +} + +func scanMessage(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + for i := 0; i < len(data); i++ { + if data[i] == 0xff && data[i-1] == 0xff { + return i + 1, data[0 : i-1], nil + } + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + if len(data) > 1 && data[len(data)-1] == 0xff && data[len(data)-2] == 0xff { + return len(data[0 : len(data)-2]), data, nil + } else { + return len(data), data, nil + } + } + // Request more data. + return 0, nil, nil +} diff --git a/pkg/observability/decoding_test.go b/pkg/observability/decoding_test.go index f506488..34148c9 100644 --- a/pkg/observability/decoding_test.go +++ b/pkg/observability/decoding_test.go @@ -36,7 +36,7 @@ func TestDecoding(t *testing.T) { } } -func TestDecodeMsg(t *testing.T) { +func TestDecodeMessages(t *testing.T) { msgs := []FluentBitMessage{ { Date: 1720613813.197045, @@ -54,7 +54,7 @@ func TestDecodeMsg(t *testing.T) { }, } encoded := encodeMessage(msgs) - decoded := decodeMessage(encoded) + decoded := decodeMessages(encoded) if len(msgs) != len(decoded) { t.Fatalf("length doesn't match") @@ -73,3 +73,104 @@ func TestDecodeMsg(t *testing.T) { } } } + +func TestDecodeMessage(t *testing.T) { + msgs := []FluentBitMessage{ + { + Date: 1720613813.197099, + Data: map[string]string{ + "second data set": "my value", + }, + }, + } + encoded := encodeMessage(msgs) + message := decodeMessage(encoded) + + if message.Date != message.Date { + t.Fatalf("date doesn't match") + } + if len(msgs[0].Data) != len(message.Data) { + t.Fatalf("length of data doesn't match") + } + for kk := range message.Data { + if msgs[0].Data[kk] != message.Data[kk] { + t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], message.Data[kk]) + } + } +} +func TestDecodeMessageWithoutTerminator(t *testing.T) { + msgs := []FluentBitMessage{ + { + Date: 1720613813.197099, + Data: map[string]string{ + "second data set": "my value", + }, + }, + } + encoded := encodeMessage(msgs) + message := decodeMessage(bytes.TrimSuffix(encoded, []byte{0xff, 0xff})) + + if message.Date != message.Date { + t.Fatalf("date doesn't match") + } + if len(msgs[0].Data) != len(message.Data) { + t.Fatalf("length of data doesn't match: got: '%s', expected '%s'", message.Data, msgs[0].Data) + } + for kk := range message.Data { + if msgs[0].Data[kk] != message.Data[kk] { + t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], msgs[0].Data[kk]) + } + } +} + +func TestScanMessage(t *testing.T) { + msgs := []FluentBitMessage{ + { + Date: 1720613813.197045, + Data: map[string]string{ + "mykey": "this is myvalue", + "second key": "this is my second value", + "third key": "this is my third value", + }, + }, + { + Date: 1720613813.197099, + Data: map[string]string{ + "second data set": "my value", + }, + }, + } + encoded := encodeMessage(msgs) + // first record + advance, record1, err := scanMessage(encoded, false) + if err != nil { + t.Fatalf("scan lines error: %s", err) + } + firstRecord := decodeMessages(append(record1, []byte{0xff, 0xff}...)) + if len(firstRecord) == 0 { + t.Fatalf("first record is empty") + } + if firstRecord[0].Data["mykey"] != "this is myvalue" { + t.Fatalf("wrong data returned") + } + // second record + advance2, record2, err := scanMessage(encoded[advance:], false) + if err != nil { + t.Fatalf("scan lines error: %s", err) + } + secondRecord := decodeMessages(append(record2, []byte{0xff, 0xff}...)) + if len(secondRecord) == 0 { + t.Fatalf("first record is empty") + } + if secondRecord[0].Data["second data set"] != "my value" { + t.Fatalf("wrong data returned in second record") + } + // third call + advance3, record3, err := scanMessage(encoded[advance+advance2:], false) + if err != nil { + t.Fatalf("scan lines error: %s", err) + } + if advance3 != 0 { + t.Fatalf("third record should be empty. Got: %+v", record3) + } +} diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index b557bde..a7a44ea 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -1,8 +1,11 @@ package observability import ( + "encoding/json" "fmt" "net/http" + "strconv" + "time" ) func (o *Observability) observabilityHandler(w http.ResponseWriter, r *http.Request) { @@ -21,3 +24,62 @@ func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request) } w.WriteHeader(http.StatusOK) } + +func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusBadRequest) + return + } + if r.FormValue("fromDate") == "" { + o.returnError(w, fmt.Errorf("no from date supplied"), http.StatusBadRequest) + return + } + fromDate, err := time.Parse("2006-01-02", r.FormValue("fromDate")) + if err != nil { + o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest) + return + } + if r.FormValue("endDate") == "" { + o.returnError(w, fmt.Errorf("no end date supplied"), http.StatusBadRequest) + return + } + endDate, err := time.Parse("2006-01-02", r.FormValue("endDate")) + if err != nil { + o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest) + return + } + offset := 0 + if r.FormValue("offset") != "" { + i, err := strconv.Atoi(r.PathValue("offset")) + if err == nil { + offset = i + } + } + maxLines := 0 + if r.FormValue("maxLines") != "" { + i, err := strconv.Atoi(r.PathValue("maxLines")) + if err == nil { + maxLines = i + } + } + pos := int64(0) + if r.FormValue("pos") != "" { + i, err := strconv.ParseInt(r.PathValue("pos"), 10, 64) + if err == nil { + pos = i + } + } + out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Printf("get logs error: %s", err) + return + } + outBytes, err := json.Marshal(out) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Printf("marshal error: %s", err) + return + } + w.Write(outBytes) +} diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 006cb5c..12a1523 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -56,7 +56,7 @@ func TestIngestionHandler(t *testing.T) { if err != nil { t.Fatalf("read file error: %s", err) } - decodedMessages := decodeMessage(messages) + decodedMessages := decodeMessages(messages) if decodedMessages[0].Date != 1720613813.197045 { t.Fatalf("unexpected date. Got %f, expected: %f", decodedMessages[0].Date, 1720613813.197045) } diff --git a/pkg/observability/helpers.go b/pkg/observability/helpers.go new file mode 100644 index 0000000..a54eb01 --- /dev/null +++ b/pkg/observability/helpers.go @@ -0,0 +1,15 @@ +package observability + +import ( + "fmt" + "net/http" + "strings" +) + +func (o *Observability) returnError(w http.ResponseWriter, err error, statusCode int) { + fmt.Println("========= ERROR =========") + fmt.Printf("Error: %s\n", err) + fmt.Println("=========================") + w.WriteHeader(statusCode) + w.Write([]byte(`{"error": "` + strings.Replace(err.Error(), `"`, `\"`, -1) + `"}`)) +} diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go new file mode 100644 index 0000000..cb17fdd --- /dev/null +++ b/pkg/observability/logs.go @@ -0,0 +1,59 @@ +package observability + +import ( + "bufio" + "fmt" + "time" +) + +func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) { + logEntryResponse := LogEntryResponse{} + + logFiles := []string{} + + for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) { + fileList, err := o.Storage.ReadDir(d.Format("2006/01/02")) + if err != nil { + return logEntryResponse, fmt.Errorf("can't read log directly: %s", err) + } + for _, filename := range fileList { + logFiles = append(logFiles, d.Format("2006/01/02")+"/"+filename) + } + } + + fileReaders, err := o.Storage.OpenFilesFromPos(logFiles, pos) + if err != nil { + return logEntryResponse, fmt.Errorf("error while reading files: %s", err) + } + for _, fileReader := range fileReaders { + defer fileReader.Close() + } + + for _, logInputData := range fileReaders { // read multiple files + if len(logEntryResponse.LogEntries) >= maxLogLines { + break + } + scanner := bufio.NewScanner(logInputData) + scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = scanMessage(data, atEOF) + pos += int64(advance) + return + }) + for scanner.Scan() && len(logEntryResponse.LogEntries) < maxLogLines { // read multiple lines + // decode, store as logentry + logMessage := decodeMessage(scanner.Bytes()) + val, ok := logMessage.Data["log"] + if ok { + logEntry := LogEntry{ + Data: val, + } + logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) + } + } + if err := scanner.Err(); err != nil { + return logEntryResponse, fmt.Errorf("log file read (scanner) error: %s", err) + } + } + + return logEntryResponse, nil +} diff --git a/pkg/observability/logs_test.go b/pkg/observability/logs_test.go new file mode 100644 index 0000000..fa7eda0 --- /dev/null +++ b/pkg/observability/logs_test.go @@ -0,0 +1,60 @@ +package observability + +import ( + "bytes" + "encoding/json" + "io" + "strconv" + "testing" + "time" + + memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory" +) + +func TestGetLogs(t *testing.T) { + totalMessagesToGenerate := 100 + storage := &memorystorage.MockMemoryStorage{} + o := NewWithoutMonitor(20) + o.Storage = storage + payload := IncomingData{ + { + "date": 1720613813.197045, + "log": "this is string: ", + }, + } + + for i := 0; i < totalMessagesToGenerate; i++ { + payload[0]["log"] = "this is string: " + strconv.Itoa(i) + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal error: %s", err) + } + data := io.NopCloser(bytes.NewReader(payloadBytes)) + err = o.Ingest(data) + if err != nil { + t.Fatalf("ingest error: %s", err) + } + } + + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } + } + + now := time.Now() + maxLogLines := 100 + + logEntryResponse, err := o.getLogs(now, now, 0, 0, maxLogLines) + if err != nil { + t.Fatalf("get logs error: %s", err) + } + if len(logEntryResponse.LogEntries) != totalMessagesToGenerate { + t.Fatalf("didn't get the same log entries as messaged we generated: got: %d, expected: %d", len(logEntryResponse.LogEntries), totalMessagesToGenerate) + } +} diff --git a/pkg/observability/router.go b/pkg/observability/router.go index cb7442d..7085fca 100644 --- a/pkg/observability/router.go +++ b/pkg/observability/router.go @@ -6,6 +6,7 @@ func (o *Observability) GetRouter() *http.ServeMux { mux := http.NewServeMux() mux.Handle("/api/observability/", http.HandlerFunc(o.observabilityHandler)) mux.Handle("/api/observability/ingestion/json", http.HandlerFunc(o.ingestionHandler)) + mux.Handle("/api/observability/logs", http.HandlerFunc(o.logsHandler)) return mux } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 99a0f42..099dff8 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -30,3 +30,12 @@ type ConcurrentRWBuffer struct { buffer bytes.Buffer mu sync.Mutex } + +type LogEntryResponse struct { + LogEntries []LogEntry `json:"logEntries"` +} + +type LogEntry struct { + Timestamp string `json:"timestamp"` + Data string `json:"data"` +} diff --git a/pkg/storage/memory/storage.go b/pkg/storage/memory/storage.go index c18b790..eb7a6c1 100644 --- a/pkg/storage/memory/storage.go +++ b/pkg/storage/memory/storage.go @@ -8,6 +8,7 @@ import ( "os" "path" "strings" + "sync" ) type MockReadWriterData []byte @@ -23,12 +24,15 @@ func (m *MockReadWriterData) Write(p []byte) (nn int, err error) { type MockMemoryStorage struct { FileInfoData map[string]*FileInfo Data map[string]*MockReadWriterData + Mu sync.Mutex } func (m *MockMemoryStorage) ConfigPath(filename string) string { return path.Join("config", filename) } func (m *MockMemoryStorage) Rename(oldName, newName string) error { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -41,6 +45,8 @@ func (m *MockMemoryStorage) Rename(oldName, newName string) error { return nil } func (m *MockMemoryStorage) FileExists(name string) bool { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -49,6 +55,8 @@ func (m *MockMemoryStorage) FileExists(name string) bool { } func (m *MockMemoryStorage) ReadFile(name string) ([]byte, error) { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -59,6 +67,8 @@ func (m *MockMemoryStorage) ReadFile(name string) ([]byte, error) { return *val, nil } func (m *MockMemoryStorage) WriteFile(name string, data []byte) error { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -66,6 +76,8 @@ func (m *MockMemoryStorage) WriteFile(name string, data []byte) error { return nil } func (m *MockMemoryStorage) AppendFile(name string, data []byte) error { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -92,13 +104,15 @@ func (m *MockMemoryStorage) EnsureOwnership(filename, login string) error { } func (m *MockMemoryStorage) ReadDir(path string) ([]string, error) { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } res := []string{} for k := range m.Data { if path == "" { - res = append(res, strings.ReplaceAll(k, path+"/", "")) + res = append(res, strings.TrimSuffix(k, "/")) } else if strings.HasPrefix(k, path+"/") { res = append(res, strings.ReplaceAll(k, path+"/", "")) } @@ -107,6 +121,8 @@ func (m *MockMemoryStorage) ReadDir(path string) ([]string, error) { } func (m *MockMemoryStorage) Remove(name string) error { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -119,9 +135,27 @@ func (m *MockMemoryStorage) Remove(name string) error { } func (m *MockMemoryStorage) OpenFilesFromPos(names []string, pos int64) ([]io.ReadCloser, error) { - return nil, fmt.Errorf("not implemented") + m.Mu.Lock() + defer m.Mu.Unlock() + if m.Data == nil { + m.Data = make(map[string]*MockReadWriterData) + } + if pos > 0 { + return nil, fmt.Errorf("pos > 0 not implemented") + } + readClosers := []io.ReadCloser{} + for _, name := range names { + val, ok := m.Data[name] + if !ok { + return nil, fmt.Errorf("file does not exist") + } + readClosers = append(readClosers, io.NopCloser(bytes.NewBuffer(*val))) + } + return readClosers, nil } func (m *MockMemoryStorage) OpenFile(name string) (io.ReadCloser, error) { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -140,6 +174,8 @@ func (m *MockMemoryStorage) OpenFileForWriting(name string) (io.WriteCloser, err return m.Data[name], nil } func (m *MockMemoryStorage) OpenFileForAppending(name string) (io.WriteCloser, error) { + m.Mu.Lock() + defer m.Mu.Unlock() if m.Data == nil { m.Data = make(map[string]*MockReadWriterData) } @@ -155,6 +191,8 @@ func (m *MockMemoryStorage) EnsurePermissions(name string, mode fs.FileMode) err return nil } func (m *MockMemoryStorage) FileInfo(name string) (fs.FileInfo, error) { + m.Mu.Lock() + defer m.Mu.Unlock() val, ok := m.FileInfoData[name] if !ok { return FileInfo{}, fmt.Errorf("couldn't get file info for: %s", name) From fdee5a43a92596fc4eee32130088670263141bed Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Fri, 20 Sep 2024 17:20:13 -0500 Subject: [PATCH 07/16] log UI --- pkg/observability/buffer.go | 2 +- pkg/observability/buffer_test.go | 2 +- pkg/observability/constants.go | 2 + pkg/observability/handlers_test.go | 2 +- pkg/observability/logs.go | 12 +++++- pkg/observability/logs_test.go | 15 ++++++- pkg/observability/new.go | 9 ++-- pkg/rest/context.go | 4 +- pkg/rest/setup.go | 2 +- pkg/rest/types.go | 1 + webapp/src/App.tsx | 37 +++++++++++++++- webapp/src/AppInit/AppInit.tsx | 14 ++++-- webapp/src/NavBar/NavBar.tsx | 69 ++++++++++++++++++++++++------ 13 files changed, 139 insertions(+), 32 deletions(-) diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index b415ffb..4aa0bab 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -35,7 +35,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { func (o *Observability) monitorBuffer() { for { time.Sleep(FLUSH_TIME_MAX_MINUTES * time.Minute) - if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES * time.Minute) { + if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES*time.Minute) && o.Buffer.Len() > 0 { if o.FlushOverflow.CompareAndSwap(false, true) { err := o.WriteBufferToStorage(int64(o.Buffer.Len())) o.FlushOverflow.Swap(true) diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index b920545..0f545d6 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -14,7 +14,7 @@ import ( func TestIngestion(t *testing.T) { totalMessagesToGenerate := 1000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(20) + o := NewWithoutMonitor(storage, 20) o.Storage = storage payload := IncomingData{ { diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go index 0427072..2d63170 100644 --- a/pkg/observability/constants.go +++ b/pkg/observability/constants.go @@ -2,3 +2,5 @@ package observability const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB const FLUSH_TIME_MAX_MINUTES = 5 + +const TIMESTAMP_FORMAT = "2006-01-02T15:04:05" diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 12a1523..dc212a4 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -12,7 +12,7 @@ import ( func TestIngestionHandler(t *testing.T) { storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(20) + o := NewWithoutMonitor(storage, 20) o.Storage = storage payload := IncomingData{ { diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go index cb17fdd..66436bb 100644 --- a/pkg/observability/logs.go +++ b/pkg/observability/logs.go @@ -3,6 +3,7 @@ package observability import ( "bufio" "fmt" + "math" "time" ) @@ -44,8 +45,10 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, logMessage := decodeMessage(scanner.Bytes()) val, ok := logMessage.Data["log"] if ok { + timestamp := floatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute) logEntry := LogEntry{ - Data: val, + Timestamp: timestamp.Format(TIMESTAMP_FORMAT), + Data: val, } logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) } @@ -57,3 +60,10 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, return logEntryResponse, nil } + +func floatToDate(datetime float64) time.Time { + datetimeInt := int64(datetime) + decimals := datetime - float64(datetimeInt) + nsecs := int64(math.Round(decimals * 1_000_000)) // precision to match golang's time.Time + return time.Unix(datetimeInt, nsecs*1000) +} diff --git a/pkg/observability/logs_test.go b/pkg/observability/logs_test.go index fa7eda0..e450d59 100644 --- a/pkg/observability/logs_test.go +++ b/pkg/observability/logs_test.go @@ -14,8 +14,7 @@ import ( func TestGetLogs(t *testing.T) { totalMessagesToGenerate := 100 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(20) - o.Storage = storage + o := NewWithoutMonitor(storage, 20) payload := IncomingData{ { "date": 1720613813.197045, @@ -57,4 +56,16 @@ func TestGetLogs(t *testing.T) { if len(logEntryResponse.LogEntries) != totalMessagesToGenerate { t.Fatalf("didn't get the same log entries as messaged we generated: got: %d, expected: %d", len(logEntryResponse.LogEntries), totalMessagesToGenerate) } + if logEntryResponse.LogEntries[0].Timestamp != floatToDate(1720613813.197045).Format(TIMESTAMP_FORMAT) { + t.Fatalf("unexpected timestamp") + } +} + +func TestFloatToDate(t *testing.T) { + now := time.Now() + floatDate := float64(now.Unix()) + float64(now.Nanosecond())/1e9 + floatToDate := floatToDate(floatDate) + if !now.Equal(floatToDate) { + t.Fatalf("times are not equal. Got: %s, expected: %s", floatToDate, now) + } } diff --git a/pkg/observability/new.go b/pkg/observability/new.go index e630747..e375212 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -2,17 +2,20 @@ package observability import ( "net/http" + + "github.com/in4it/wireguard-server/pkg/storage" ) -func New() *Observability { - o := NewWithoutMonitor(MAX_BUFFER_SIZE) +func New(storage storage.Iface) *Observability { + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) go o.monitorBuffer() return o } -func NewWithoutMonitor(maxBufferSize int) *Observability { +func NewWithoutMonitor(storage storage.Iface, maxBufferSize int) *Observability { o := &Observability{ Buffer: &ConcurrentRWBuffer{}, MaxBufferSize: maxBufferSize, + Storage: storage, } return o } diff --git a/pkg/rest/context.go b/pkg/rest/context.go index 5343ba3..ba7a2a4 100644 --- a/pkg/rest/context.go +++ b/pkg/rest/context.go @@ -90,10 +90,10 @@ func newContext(storage storage.Iface, serverType string) (*Context, error) { if c.Observability == nil { c.Observability = &Observability{ - Client: observability.New(), + Client: observability.New(storage), } } else { - c.Observability.Client = observability.New() + c.Observability.Client = observability.New(storage) } return c, nil diff --git a/pkg/rest/setup.go b/pkg/rest/setup.go index 7149b0d..719ae8f 100644 --- a/pkg/rest/setup.go +++ b/pkg/rest/setup.go @@ -134,7 +134,7 @@ func (c *Context) contextHandler(w http.ResponseWriter, r *http.Request) { } } - out, err := json.Marshal(ContextSetupResponse{SetupCompleted: c.SetupCompleted, CloudType: c.CloudType}) + out, err := json.Marshal(ContextSetupResponse{SetupCompleted: c.SetupCompleted, CloudType: c.CloudType, ServerType: c.ServerType}) if err != nil { c.returnError(w, err, http.StatusBadRequest) return diff --git a/pkg/rest/types.go b/pkg/rest/types.go index 133b2fc..d3c54c1 100644 --- a/pkg/rest/types.go +++ b/pkg/rest/types.go @@ -70,6 +70,7 @@ type ContextRequest struct { type ContextSetupResponse struct { SetupCompleted bool `json:"setupCompleted"` CloudType string `json:"cloudType"` + ServerType string `json:"serverType"` } type AuthMethodsResponse struct { diff --git a/webapp/src/App.tsx b/webapp/src/App.tsx index ecc98fa..eb2d9ab 100644 --- a/webapp/src/App.tsx +++ b/webapp/src/App.tsx @@ -19,6 +19,7 @@ import { Profile } from "./Routes/Profile/Profile"; import { Upgrade } from "./Routes/Upgrade/Upgrade"; import { GetMoreLicenses } from "./Routes/Licenses/GetMoreLicenses"; import { PacketLogs } from "./Routes/PacketLogs/PacketLogs"; +import { Logs } from "./Routes/Logs/Logs"; const queryClient = new QueryClient() @@ -27,7 +28,7 @@ export default function App() { return - + - + @@ -60,6 +61,38 @@ export default function App() { + + + + + + + + + } /> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + } /> + + + + + ; diff --git a/webapp/src/AppInit/AppInit.tsx b/webapp/src/AppInit/AppInit.tsx index 3c86f40..0372abe 100644 --- a/webapp/src/AppInit/AppInit.tsx +++ b/webapp/src/AppInit/AppInit.tsx @@ -6,11 +6,13 @@ import React, { useState } from 'react'; import { SetupBanner } from './SetupBanner'; import { AppSettings } from '../Constants/Constants'; - type Props = { - children?: React.ReactNode - }; - export const AppInit: React.FC = ({children}) => { +type Props = { + children?: React.ReactNode + serverType: string +}; + + export const AppInit: React.FC = ({children, serverType}) => { const [setupCompleted, setSetupCompleted] = useState(false); const { isPending, error, data } = useQuery({ queryKey: ['context'], @@ -22,6 +24,10 @@ import { AppSettings } from '../Constants/Constants'; if (isPending) return '' if (error) return 'An backend error has occurred: ' + error.message + if (data.serverType !== serverType) { + return '' + } + if(!setupCompleted && data.setupCompleted) { setSetupCompleted(true) } diff --git a/webapp/src/NavBar/NavBar.tsx b/webapp/src/NavBar/NavBar.tsx index d0437fe..2b81c2d 100644 --- a/webapp/src/NavBar/NavBar.tsx +++ b/webapp/src/NavBar/NavBar.tsx @@ -18,25 +18,66 @@ import { NavLink, useLocation } from 'react-router-dom'; import { useAuthContext } from '../Auth/Auth'; import { Version } from './Version'; -export function NavBar() { +type Props = { + serverType: string +}; + + +export function NavBar({serverType}: Props) { const {authInfo} = useAuthContext(); const location = useLocation(); const { pathname } = location; const [active, setActive] = useState(pathname); - const data = authInfo.role === "admin" ? [ - { link: '/', label: 'Status', icon: TbBellRinging }, - { link: '/connection', label: 'VPN Connections', icon: TbPlugConnected }, - { link: '/users', label: 'Users', icon: TbUser }, - { link: '/setup', label: 'VPN Setup', icon: TbSettings }, - { link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection }, - { link: '/packetlogs', label: 'Logging', icon: FaStream }, - { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, - ] : - [ - { link: '/connection', label: 'VPN Connections', icon: TbPlugConnected }, - { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, - ]; + const vpnLinks = { + "admin": [ + { link: '/', label: 'Status', icon: TbBellRinging }, + { link: '/connection', label: 'VPN Connections', icon: TbPlugConnected }, + { link: '/users', label: 'Users', icon: TbUser }, + { link: '/setup', label: 'VPN Setup', icon: TbSettings }, + { link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection }, + { link: '/packetlogs', label: 'Logging', icon: FaStream }, + { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, + ], + "user": [ + { link: '/connection', label: 'VPN Connections', icon: TbPlugConnected }, + { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, + ] + } + const observabilityLinks = { + "admin": [ + { link: '/', label: 'Status', icon: TbBellRinging }, + { link: '/users', label: 'Users', icon: TbUser }, + { link: '/setup', label: 'VPN Setup', icon: TbSettings }, + { link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection }, + { link: '/logs', label: 'Logs', icon: FaStream }, + { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, + ], + "user": [ + { link: '/logs', label: 'Logs', icon: FaStream }, + { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, + ] + } + + const getData = () => { + if(serverType === "vpn") { + if (authInfo.role === "admin" ) { + return vpnLinks.admin + } else { + return vpnLinks.user + } + } + if(serverType === "observability") { + if (authInfo.role === "admin" ) { + return observabilityLinks.admin + } else { + return observabilityLinks.user + } + } + return [] + } + + const data = getData() const links = data.map((item) => ( Date: Fri, 20 Sep 2024 21:07:43 -0500 Subject: [PATCH 08/16] ui work --- pkg/observability/buffer.go | 25 ++++++++++++++++++++++++- pkg/observability/buffer_test.go | 16 +++++++++++----- pkg/observability/constants.go | 2 +- pkg/observability/logs.go | 9 ++++++++- pkg/observability/new.go | 4 ++-- pkg/observability/types.go | 4 +++- 6 files changed, 49 insertions(+), 11 deletions(-) diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index 4aa0bab..db338f5 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -4,10 +4,13 @@ import ( "bytes" "fmt" "io" + "path" "strconv" + "strings" "time" "github.com/in4it/wireguard-server/pkg/logging" + "github.com/in4it/wireguard-server/pkg/storage" ) func (o *Observability) WriteBufferToStorage(n int64) error { @@ -21,7 +24,12 @@ func (o *Observability) WriteBufferToStorage(n int64) error { return fmt.Errorf("write error from buffer to temporary buffer: %s", err) } now := time.Now() - file, err := o.Storage.OpenFileForWriting(now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)) + filename := now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10) + err = ensurePath(o.Storage, filename) + if err != nil { + return fmt.Errorf("ensure path error: %s", err) + } + file, err := o.Storage.OpenFileForWriting(filename) if err != nil { return fmt.Errorf("open file for writing error: %s", err) } @@ -29,6 +37,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { if err != nil { return fmt.Errorf("file write error: %s", err) } + logging.DebugLog(fmt.Errorf("wrote file: %s", filename)) return file.Close() } @@ -91,3 +100,17 @@ func (c *ConcurrentRWBuffer) Len() int { func (c *ConcurrentRWBuffer) Cap() int { return c.buffer.Cap() } + +func ensurePath(storage storage.Iface, filename string) error { + base := path.Dir(filename) + baseSplit := strings.Split(base, "/") + fullPath := "" + for _, v := range baseSplit { + fullPath = path.Join(fullPath, v) + err := storage.EnsurePath(fullPath) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index 0f545d6..f515dc1 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -74,8 +74,7 @@ func TestIngestionMoreMessages(t *testing.T) { t.Skip() // we can skip this for general unit testing totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) - o.Storage = storage + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) payload := IncomingData{ { "date": 1720613813.197045, @@ -134,8 +133,7 @@ func TestIngestionMoreMessages(t *testing.T) { func BenchmarkIngest10000000(b *testing.B) { totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) - o.Storage = storage + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) payload := IncomingData{ { "date": 1720613813.197045, @@ -170,7 +168,7 @@ func BenchmarkIngest10000000(b *testing.B) { func BenchmarkIngest100000000(b *testing.B) { totalMessagesToGenerate := 10000000 // 10,000,000 storage := &memorystorage.MockMemoryStorage{} - o := NewWithoutMonitor(MAX_BUFFER_SIZE) + o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) o.Storage = storage payload := IncomingData{ { @@ -202,3 +200,11 @@ func BenchmarkIngest100000000(b *testing.B) { } } } + +func TestEnsurePath(t *testing.T) { + storage := &memorystorage.MockMemoryStorage{} + err := ensurePath(storage, "a/b/c/filename.txt") + if err != nil { + t.Fatalf("error: %s", err) + } +} diff --git a/pkg/observability/constants.go b/pkg/observability/constants.go index 2d63170..1696cbd 100644 --- a/pkg/observability/constants.go +++ b/pkg/observability/constants.go @@ -1,6 +1,6 @@ package observability const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB -const FLUSH_TIME_MAX_MINUTES = 5 +const FLUSH_TIME_MAX_MINUTES = 1 // should have 5 as default at release const TIMESTAMP_FORMAT = "2006-01-02T15:04:05" diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go index 66436bb..e63cf23 100644 --- a/pkg/observability/logs.go +++ b/pkg/observability/logs.go @@ -8,10 +8,17 @@ import ( ) func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) { - logEntryResponse := LogEntryResponse{} + logEntryResponse := LogEntryResponse{ + Enabled: true, + Environments: []string{"dev", "qa", "prod"}, + } logFiles := []string{} + if maxLogLines == 0 { + maxLogLines = 100 + } + for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) { fileList, err := o.Storage.ReadDir(d.Format("2006/01/02")) if err != nil { diff --git a/pkg/observability/new.go b/pkg/observability/new.go index e375212..7bf9f0b 100644 --- a/pkg/observability/new.go +++ b/pkg/observability/new.go @@ -6,8 +6,8 @@ import ( "github.com/in4it/wireguard-server/pkg/storage" ) -func New(storage storage.Iface) *Observability { - o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE) +func New(defaultStorage storage.Iface) *Observability { + o := NewWithoutMonitor(defaultStorage, MAX_BUFFER_SIZE) go o.monitorBuffer() return o } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 099dff8..ba83131 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -32,7 +32,9 @@ type ConcurrentRWBuffer struct { } type LogEntryResponse struct { - LogEntries []LogEntry `json:"logEntries"` + Enabled bool `json:"enabled"` + LogEntries []LogEntry `json:"logEntries"` + Environments []string `json:"environments"` } type LogEntry struct { From d084dd66daad6ad952b6505f4f6cd81315cb5789 Mon Sep 17 00:00:00 2001 From: Edward Viaene Date: Mon, 23 Sep 2024 17:53:58 -0500 Subject: [PATCH 09/16] logging UI --- pkg/observability/buffer.go | 15 ++++++++++ pkg/observability/buffer_test.go | 12 ++------ pkg/observability/decoding.go | 46 ++++++++++++++++-------------- pkg/observability/decoding_test.go | 44 ++++++++++++++++++++++++++++ pkg/observability/handlers.go | 9 +++--- pkg/observability/logs.go | 30 ++++++++++++++----- pkg/observability/logs_test.go | 19 +++++++++++- pkg/observability/types.go | 27 ++++++++++++++++-- webapp/src/NavBar/NavBar.tsx | 6 ++-- 9 files changed, 159 insertions(+), 49 deletions(-) diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index db338f5..b0076e0 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -64,6 +64,7 @@ func (o *Observability) Ingest(data io.ReadCloser) error { if err != nil { return fmt.Errorf("decode error: %s", err) } + logging.DebugLog(fmt.Errorf("messages ingested: %d", len(msgs))) _, err = o.Buffer.Write(encodeMessage(msgs)) if err != nil { return fmt.Errorf("write error: %s", err) @@ -84,6 +85,20 @@ func (o *Observability) Ingest(data io.ReadCloser) error { return nil } +func (o *Observability) Flush() error { + // wait until all data is flushed + o.ActiveBufferWriters.Wait() + + // flush remaining data that hasn't been flushed + if n := o.Buffer.Len(); n >= 0 { + err := o.WriteBufferToStorage(int64(n)) + if err != nil { + return fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) + } + } + return nil +} + func (c *ConcurrentRWBuffer) Write(p []byte) (n int, err error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/observability/buffer_test.go b/pkg/observability/buffer_test.go index f515dc1..ec38325 100644 --- a/pkg/observability/buffer_test.go +++ b/pkg/observability/buffer_test.go @@ -94,15 +94,9 @@ func TestIngestionMoreMessages(t *testing.T) { } } - // wait until all data is flushed - o.ActiveBufferWriters.Wait() - - // flush remaining data that hasn't been flushed - if n := o.Buffer.Len(); n >= 0 { - err := o.WriteBufferToStorage(int64(n)) - if err != nil { - t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err) - } + err = o.Flush() + if err != nil { + t.Fatalf("flush error: %s", err) } dirlist, err := storage.ReadDir("") diff --git a/pkg/observability/decoding.go b/pkg/observability/decoding.go index 1bd4bd1..cbd9062 100644 --- a/pkg/observability/decoding.go +++ b/pkg/observability/decoding.go @@ -23,31 +23,33 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { if len(m1) == 0 { return result, fmt.Errorf("empty array") } - switch m2 := m1[0].(type) { - case map[string]interface{}: - var fluentBitMessage FluentBitMessage - fluentBitMessage.Data = make(map[string]string) - val, ok := m2["date"] - if ok { - fluentBitMessage.Date = val.(float64) - } - for key, value := range m2 { - if key != "date" { - switch valueTyped := value.(type) { - case string: - fluentBitMessage.Data[key] = valueTyped - case float64: - fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) - case []byte: - fluentBitMessage.Data[key] = string(valueTyped) - default: - fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + for _, m1Element := range m1 { + switch m2 := m1Element.(type) { + case map[string]interface{}: + var fluentBitMessage FluentBitMessage + fluentBitMessage.Data = make(map[string]string) + val, ok := m2["date"] + if ok { + fluentBitMessage.Date = val.(float64) + } + for key, value := range m2 { + if key != "date" { + switch valueTyped := value.(type) { + case string: + fluentBitMessage.Data[key] = valueTyped + case float64: + fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) + case []byte: + fluentBitMessage.Data[key] = string(valueTyped) + default: + fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + } } } + result = append(result, fluentBitMessage) + default: + return result, fmt.Errorf("invalid type: no map found in array") } - result = append(result, fluentBitMessage) - default: - return result, fmt.Errorf("invalid type: no map found in array") } default: return result, fmt.Errorf("invalid type: no array found") diff --git a/pkg/observability/decoding_test.go b/pkg/observability/decoding_test.go index 34148c9..82fa736 100644 --- a/pkg/observability/decoding_test.go +++ b/pkg/observability/decoding_test.go @@ -36,6 +36,50 @@ func TestDecoding(t *testing.T) { } } +func TestDecodingMultiMessage(t *testing.T) { + payload := IncomingData{ + { + "date": 1727119152.0, + "container_name": "/fluentbit-nginx-1", + "source": "stdout", + "log": "/docker-entrypoint.sh: /docker-entrypoint.d/ is not empty, will attempt to perform configuration", + "container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186", + }, + { + "date": 1727119152.0, + "source": "stdout", + "log": "/docker-entrypoint.sh: Looking for shell scripts in /docker-entrypoint.d/", + "container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186", + "container_name": "/fluentbit-nginx-1", + }, + { + "date": 1727119152.0, + "container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186", + "container_name": "/fluentbit-nginx-1", + "source": "stdout", + "log": "/docker-entrypoint.sh: Launching /docker-entrypoint.d/10-listen-on-ipv6-by-default.sh", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("json marshal error: %s", err) + } + messages, err := Decode(bytes.NewBuffer(payloadBytes)) + if err != nil { + t.Fatalf("error: %s", err) + } + if len(messages) != len(payload) { + t.Fatalf("incorrect messages returned. Got %d, expected %d", len(messages), len(payload)) + } + val, ok := messages[2].Data["container_id"] + if !ok { + t.Fatalf("container_id key not found") + } + if string(val) != "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186" { + t.Fatalf("wrong data returned: %s", val) + } +} + func TestDecodeMessages(t *testing.T) { msgs := []FluentBitMessage{ { diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index a7a44ea..97575b9 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -50,26 +50,27 @@ func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) { } offset := 0 if r.FormValue("offset") != "" { - i, err := strconv.Atoi(r.PathValue("offset")) + i, err := strconv.Atoi(r.FormValue("offset")) if err == nil { offset = i } } maxLines := 0 if r.FormValue("maxLines") != "" { - i, err := strconv.Atoi(r.PathValue("maxLines")) + i, err := strconv.Atoi(r.FormValue("maxLines")) if err == nil { maxLines = i } } pos := int64(0) if r.FormValue("pos") != "" { - i, err := strconv.ParseInt(r.PathValue("pos"), 10, 64) + i, err := strconv.ParseInt(r.FormValue("pos"), 10, 64) if err == nil { pos = i } } - out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset) + search := r.FormValue("search") + out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, search) if err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Printf("get logs error: %s", err) diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go index e63cf23..5f266b9 100644 --- a/pkg/observability/logs.go +++ b/pkg/observability/logs.go @@ -4,13 +4,16 @@ import ( "bufio" "fmt" "math" + "strings" "time" ) -func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) { +func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string) (LogEntryResponse, error) { logEntryResponse := LogEntryResponse{ Enabled: true, Environments: []string{"dev", "qa", "prod"}, + LogEntries: []LogEntry{}, + Keys: make(map[KeyValue]int), } logFiles := []string{} @@ -22,7 +25,8 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) { fileList, err := o.Storage.ReadDir(d.Format("2006/01/02")) if err != nil { - return logEntryResponse, fmt.Errorf("can't read log directly: %s", err) + logEntryResponse.NextPos = -1 + return logEntryResponse, nil // can't read directory, return empty response } for _, filename := range fileList { logFiles = append(logFiles, d.Format("2006/01/02")+"/"+filename) @@ -50,20 +54,32 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, for scanner.Scan() && len(logEntryResponse.LogEntries) < maxLogLines { // read multiple lines // decode, store as logentry logMessage := decodeMessage(scanner.Bytes()) - val, ok := logMessage.Data["log"] + logline, ok := logMessage.Data["log"] if ok { timestamp := floatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute) - logEntry := LogEntry{ - Timestamp: timestamp.Format(TIMESTAMP_FORMAT), - Data: val, + if search == "" || strings.Contains(logline, search) { + logEntry := LogEntry{ + Timestamp: timestamp.Format(TIMESTAMP_FORMAT), + Data: logline, + } + logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) + for k, v := range logMessage.Data { + if k != "log" { + logEntryResponse.Keys[KeyValue{Key: k, Value: v}] += 1 + } + } } - logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) } } if err := scanner.Err(); err != nil { return logEntryResponse, fmt.Errorf("log file read (scanner) error: %s", err) } } + if len(logEntryResponse.LogEntries) < maxLogLines { + logEntryResponse.NextPos = -1 // no more records + } else { + logEntryResponse.NextPos = pos + } return logEntryResponse, nil } diff --git a/pkg/observability/logs_test.go b/pkg/observability/logs_test.go index e450d59..4cec612 100644 --- a/pkg/observability/logs_test.go +++ b/pkg/observability/logs_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "strconv" + "strings" "testing" "time" @@ -48,8 +49,9 @@ func TestGetLogs(t *testing.T) { now := time.Now() maxLogLines := 100 + search := "" - logEntryResponse, err := o.getLogs(now, now, 0, 0, maxLogLines) + logEntryResponse, err := o.getLogs(now, now, 0, 0, maxLogLines, search) if err != nil { t.Fatalf("get logs error: %s", err) } @@ -69,3 +71,18 @@ func TestFloatToDate(t *testing.T) { t.Fatalf("times are not equal. Got: %s, expected: %s", floatToDate, now) } } + +func TestKeyValue(t *testing.T) { + logEntryResponse := LogEntryResponse{ + Keys: map[KeyValue]int{ + {Key: "k", Value: "v"}: 4, + }, + } + out, err := json.Marshal(logEntryResponse) + if err != nil { + t.Fatalf("error: %s", err) + } + if !strings.Contains(string(out), `"keys":[{"key":"k","value":"v","total":4}]`) { + t.Fatalf("wrong output: %s", out) + } +} diff --git a/pkg/observability/types.go b/pkg/observability/types.go index ba83131..af818be 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -2,6 +2,8 @@ package observability import ( "bytes" + "strconv" + "strings" "sync" "sync/atomic" "time" @@ -32,12 +34,31 @@ type ConcurrentRWBuffer struct { } type LogEntryResponse struct { - Enabled bool `json:"enabled"` - LogEntries []LogEntry `json:"logEntries"` - Environments []string `json:"environments"` + Enabled bool `json:"enabled"` + LogEntries []LogEntry `json:"logEntries"` + Environments []string `json:"environments"` + Keys KeyValueInt `json:"keys"` + NextPos int64 `json:"nextPos"` } type LogEntry struct { Timestamp string `json:"timestamp"` Data string `json:"data"` } + +type KeyValueInt map[KeyValue]int + +type KeyValue struct { + Key string + Value string +} + +func (kv KeyValueInt) MarshalJSON() ([]byte, error) { + res := "[" + for k, v := range kv { + res += `{ "key" : "` + k.Key + `", "value": "` + k.Value + `", "total": ` + strconv.Itoa(v) + ` },` + } + res = strings.TrimRight(res, ",") + res += "]" + return []byte(res), nil +} diff --git a/webapp/src/NavBar/NavBar.tsx b/webapp/src/NavBar/NavBar.tsx index 2b81c2d..095f33c 100644 --- a/webapp/src/NavBar/NavBar.tsx +++ b/webapp/src/NavBar/NavBar.tsx @@ -47,10 +47,10 @@ export function NavBar({serverType}: Props) { const observabilityLinks = { "admin": [ { link: '/', label: 'Status', icon: TbBellRinging }, + { link: '/logs', label: 'Logs', icon: FaStream }, { link: '/users', label: 'Users', icon: TbUser }, - { link: '/setup', label: 'VPN Setup', icon: TbSettings }, + { link: '/setup', label: 'Setup', icon: TbSettings }, { link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection }, - { link: '/logs', label: 'Logs', icon: FaStream }, { link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook }, ], "user": [ @@ -99,7 +99,7 @@ export function NavBar({serverType}: Props) {