diff --git a/cmd/cloudwatch-ingestion/main.go b/cmd/cloudwatch-ingestion/main.go index 94a44db..78798e5 100644 --- a/cmd/cloudwatch-ingestion/main.go +++ b/cmd/cloudwatch-ingestion/main.go @@ -10,7 +10,6 @@ import ( "net/http" "os" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -104,8 +103,10 @@ func fetchLogEvents(svc *cloudwatchlogs.Client, logGroupName, logStreamName stri } for _, event := range result.Events { + seconds := float64(*event.Timestamp / 1000) + microseconds := float64(*event.Timestamp%1000) * 1000 messages = append(messages, map[string]any{ - "date": float64(*event.Timestamp * int64(time.Millisecond)), + "date": seconds + (microseconds / 1e6), "log": *event.Message, "log-group": logGroupName, "log-stream": logStreamWithoutRandom, diff --git a/pkg/observability/buffer.go b/pkg/observability/buffer.go index e6eeb39..3cc1fa0 100644 --- a/pkg/observability/buffer.go +++ b/pkg/observability/buffer.go @@ -18,6 +18,7 @@ func (o *Observability) WriteBufferToStorage(n int64) error { defer o.ActiveBufferWriters.Done() o.WriteLock.Lock() defer o.WriteLock.Unlock() + logging.DebugLog(fmt.Errorf("writing buffer to file. Buffer has: %d bytes", n)) // copy first to temporary buffer (storage might have latency) tempBuf := bytes.NewBuffer(make([]byte, 0, n)) _, err := io.CopyN(tempBuf, o.Buffer, n) @@ -28,7 +29,6 @@ func (o *Observability) WriteBufferToStorage(n int64) error { o.LastFlushed = time.Now() for _, bufferPosAndPrefix := range mergeBufferPosAndPrefix(prefix) { - now := time.Now() filename := bufferPosAndPrefix.prefix + "/data-" + strconv.FormatInt(now.Unix(), 10) + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10) err = ensurePath(o.Storage, filename) @@ -79,7 +79,7 @@ func (o *Observability) Ingest(data io.ReadCloser) error { if len(msgs) == 0 { return nil // no messages to ingest } - _, err = o.Buffer.Write(encodeMessage(msgs), floatToDate(msgs[0].Date).Format(DATE_PREFIX)) + _, err = o.Buffer.Write(encodeMessage(msgs), FloatToDate(msgs[0].Date).Format(DATE_PREFIX)) if err != nil { return fmt.Errorf("write error: %s", err) } diff --git a/pkg/observability/handlers.go b/pkg/observability/handlers.go index db14a4b..884272f 100644 --- a/pkg/observability/handlers.go +++ b/pkg/observability/handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" ) @@ -70,8 +71,16 @@ func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) { pos = i } } - search := r.FormValue("search") - out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, search) + displayTags := strings.Split(r.FormValue("display-tags"), ",") + filterTagsSplit := strings.Split(r.FormValue("filter-tags"), ",") + filterTags := []KeyValue{} + for _, tag := range filterTagsSplit { + kv := strings.Split(tag, "=") + if len(kv) == 2 { + filterTags = append(filterTags, KeyValue{Key: kv[0], Value: kv[1]}) + } + } + out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, r.FormValue("search"), displayTags, filterTags) if err != nil { w.WriteHeader(http.StatusBadRequest) fmt.Printf("get logs error: %s", err) diff --git a/pkg/observability/helpers.go b/pkg/observability/helpers.go index 93b1b8c..2c98ee3 100644 --- a/pkg/observability/helpers.go +++ b/pkg/observability/helpers.go @@ -16,7 +16,7 @@ func (o *Observability) returnError(w http.ResponseWriter, err error, statusCode w.Write([]byte(`{"error": "` + strings.Replace(err.Error(), `"`, `\"`, -1) + `"}`)) } -func floatToDate(datetime float64) time.Time { +func FloatToDate(datetime float64) time.Time { datetimeInt := int64(datetime) decimals := datetime - float64(datetimeInt) nsecs := int64(math.Round(decimals * 1_000_000)) // precision to match golang's time.Time diff --git a/pkg/observability/helpers_test.go b/pkg/observability/helpers_test.go index 3722c2f..195156d 100644 --- a/pkg/observability/helpers_test.go +++ b/pkg/observability/helpers_test.go @@ -8,7 +8,7 @@ import ( func TestFloatToDate2Way(t *testing.T) { now := time.Now() float := DateToFloat(now) - date := floatToDate(float) + date := FloatToDate(float) if date.Format(TIMESTAMP_FORMAT) != now.Format(TIMESTAMP_FORMAT) { t.Fatalf("got: %s, expected: %s", date.Format(TIMESTAMP_FORMAT), now.Format(TIMESTAMP_FORMAT)) } diff --git a/pkg/observability/logs.go b/pkg/observability/logs.go index 96fb1d1..772770b 100644 --- a/pkg/observability/logs.go +++ b/pkg/observability/logs.go @@ -8,12 +8,11 @@ import ( "time" ) -func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string) (LogEntryResponse, error) { +func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string, displayTags []string, filterTags []KeyValue) (LogEntryResponse, error) { logEntryResponse := LogEntryResponse{ - Enabled: true, - Environments: []string{"dev", "qa", "prod"}, - LogEntries: []LogEntry{}, - Keys: KeyValueInt{}, + Enabled: true, + LogEntries: []LogEntry{}, + Tags: KeyValueInt{}, } keys := make(map[KeyValue]int) @@ -58,16 +57,37 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLi logMessage := decodeMessage(scanner.Bytes()) logline, ok := logMessage.Data["log"] if ok { - timestamp := floatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute) + timestamp := FloatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute) if search == "" || strings.Contains(logline, search) { - logEntry := LogEntry{ - Timestamp: timestamp.Format(TIMESTAMP_FORMAT), - Data: logline, + tags := []KeyValue{} + for _, tag := range displayTags { + if tagValue, ok := logMessage.Data[tag]; ok { + tags = append(tags, KeyValue{Key: tag, Value: tagValue}) + } + } + filterMessage := true + if len(filterTags) == 0 { + filterMessage = false + } else { + for _, filter := range filterTags { + if tagValue, ok := logMessage.Data[filter.Key]; ok { + if tagValue == filter.Value { + filterMessage = false + } + } + } } - logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) - for k, v := range logMessage.Data { - if k != "log" { - keys[KeyValue{Key: k, Value: v}] += 1 + if !filterMessage { + logEntry := LogEntry{ + Timestamp: timestamp.Format(TIMESTAMP_FORMAT), + Data: logline, + Tags: tags, + } + logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry) + for k, v := range logMessage.Data { + if k != "log" { + keys[KeyValue{Key: k, Value: v}] += 1 + } } } } @@ -84,13 +104,13 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLi } for k, v := range keys { - logEntryResponse.Keys = append(logEntryResponse.Keys, KeyValueTotal{ + logEntryResponse.Tags = append(logEntryResponse.Tags, KeyValueTotal{ Key: k.Key, Value: k.Value, Total: v, }) } - sort.Sort(logEntryResponse.Keys) + sort.Sort(logEntryResponse.Tags) return logEntryResponse, nil } diff --git a/pkg/observability/logs_test.go b/pkg/observability/logs_test.go index 19dda89..e5e29ff 100644 --- a/pkg/observability/logs_test.go +++ b/pkg/observability/logs_test.go @@ -61,8 +61,8 @@ func TestGetLogs(t *testing.T) { if len(logEntryResponse.LogEntries) != totalMessagesToGenerate { t.Fatalf("didn't get the same log entries as messaged we generated: got: %d, expected: %d", len(logEntryResponse.LogEntries), totalMessagesToGenerate) } - if logEntryResponse.LogEntries[0].Timestamp != floatToDate(timestamp).Format(TIMESTAMP_FORMAT) { - t.Fatalf("unexpected timestamp: %s vs %s", logEntryResponse.LogEntries[0].Timestamp, floatToDate(timestamp).Format(TIMESTAMP_FORMAT)) + if logEntryResponse.LogEntries[0].Timestamp != FloatToDate(timestamp).Format(TIMESTAMP_FORMAT) { + t.Fatalf("unexpected timestamp: %s vs %s", logEntryResponse.LogEntries[0].Timestamp, FloatToDate(timestamp).Format(TIMESTAMP_FORMAT)) } } @@ -70,7 +70,7 @@ func TestFloatToDate(t *testing.T) { for i := 0; i < 10; i++ { now := time.Now() floatDate := float64(now.Unix()) + float64(now.Nanosecond())/1e9 - floatToDate := floatToDate(floatDate) + floatToDate := FloatToDate(floatDate) if now.Unix() != floatToDate.Unix() { t.Fatalf("times are not equal. Got: %v, expected: %v", floatToDate, now) } @@ -82,7 +82,7 @@ func TestFloatToDate(t *testing.T) { func TestKeyValue(t *testing.T) { logEntryResponse := LogEntryResponse{ - Keys: KeyValueInt{ + Tags: KeyValueInt{ {Key: "k", Value: "v", Total: 4}, }, } diff --git a/pkg/observability/types.go b/pkg/observability/types.go index 218a2cb..b718bac 100644 --- a/pkg/observability/types.go +++ b/pkg/observability/types.go @@ -41,16 +41,16 @@ type BufferPosAndPrefix struct { } type LogEntryResponse struct { - Enabled bool `json:"enabled"` - LogEntries []LogEntry `json:"logEntries"` - Environments []string `json:"environments"` - Keys KeyValueInt `json:"keys"` - NextPos int64 `json:"nextPos"` + Enabled bool `json:"enabled"` + LogEntries []LogEntry `json:"logEntries"` + Tags KeyValueInt `json:"tags"` + NextPos int64 `json:"nextPos"` } type LogEntry struct { - Timestamp string `json:"timestamp"` - Data string `json:"data"` + Timestamp string `json:"timestamp"` + Data string `json:"data"` + Tags []KeyValue `json:"tags"` } type KeyValueInt []KeyValueTotal @@ -61,8 +61,8 @@ type KeyValueTotal struct { Total int } type KeyValue struct { - Key string - Value string + Key string `json:"key"` + Value string `json:"value"` } func (kv KeyValueInt) MarshalJSON() ([]byte, error) { diff --git a/pkg/rest/login/auth_test.go b/pkg/rest/login/auth_test.go index 017c440..34e61be 100644 --- a/pkg/rest/login/auth_test.go +++ b/pkg/rest/login/auth_test.go @@ -118,6 +118,6 @@ func TestAuthenticateMFAWithToken(t *testing.T) { t.Fatalf("authentication error: %s", err) } if !loginResp.Authenticated { - t.Fatalf("expected not to be authenticated") + t.Fatalf("expected to be authenticated") } } diff --git a/webapp/src/Routes/Logs/Logs.tsx b/webapp/src/Routes/Logs/Logs.tsx index 3e3c777..e463381 100644 --- a/webapp/src/Routes/Logs/Logs.tsx +++ b/webapp/src/Routes/Logs/Logs.tsx @@ -1,4 +1,4 @@ -import { Card, Container, Text, Table, Title, Button, Grid, Select, Popover, Group, TextInput, rem, ActionIcon, Checkbox, Highlight} from "@mantine/core"; +import { Card, Container, Text, Table, Title, Button, Grid, Popover, Group, TextInput, rem, ActionIcon, Checkbox, Highlight, MultiSelect} from "@mantine/core"; import { AppSettings } from "../../Constants/Constants"; import { useInfiniteQuery } from "@tanstack/react-query"; import { useAuthContext } from "../../Auth/Auth"; @@ -13,13 +13,14 @@ type LogsDataResponse = { logEntries: LogEntry[]; environments: string[]; nextPos: number; - keys: Keys[]; + tags: Tags[]; } type LogEntry = { data: string; timestamp: string; + tags: Tag[]; } -type Keys = { +type Tags = { key: string; value: string; total: number; @@ -41,16 +42,15 @@ export function Logs() { const timezoneOffset = new Date().getTimezoneOffset() * -1 const [currentQueryParameters] = useSearchParams(); const dateParam = currentQueryParameters.get("date") - const environmentParam = currentQueryParameters.get("environment") const [tags, setTags] = useState([]) const [search, setSearch] = useState("") const [searchParam, setSearchParam] = useState("") + const [columns, setColumns] = useState([]) const [logsDate, setLogsDate] = useState(dateParam === null ? new Date() : new Date(dateParam)); - const [environment, setEnvironment] = useState(environmentParam === null ? "all" : environmentParam) const { isPending, fetchNextPage, hasNextPage, error, data } = useInfiniteQuery({ - queryKey: ['logs', environment, logsDate, tags, searchParam], + queryKey: ['logs', logsDate, tags, columns, searchParam], queryFn: async ({ pageParam }) => - fetch(AppSettings.url + '/observability/logs?environment='+(environment === undefined || environment === "" ? "all" : environment)+'&fromDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + '&endDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + "&pos="+pageParam+"&offset="+timezoneOffset+"&tags="+encodeURIComponent(tags.map(t => t.key + "=" + t.value).join(","))+"&search="+encodeURIComponent(searchParam), { + fetch(AppSettings.url + '/observability/logs?display-tags='+encodeURIComponent(columns.join(","))+'&fromDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + '&endDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + "&pos="+pageParam+"&offset="+timezoneOffset+"&filter-tags="+encodeURIComponent(tags.map(t => t.key + "=" + t.value).join(","))+"&search="+encodeURIComponent(searchParam), { headers: { "Content-Type": "application/json", "Authorization": "Bearer " + authInfo.token @@ -84,40 +84,17 @@ export function Logs() { }; }, [fetchNextPage]) - if(isPending) return "Loading..." - if(error) return 'A backend error has occurred: ' + error.message - if(data.pages.length === 0 || !data.pages[0].enabled) { // show disabled page if not enabled - return ( - - - Logs - - - - { !data.pages[0].enabled ? - "Logs are not enabled." - : - null - } - - - - - - - - - ) - } + if(error) return 'A backend error has occurred: ' + error.message - const rows = data.pages.map((group, groupIndex) => ( + const rows = isPending ? [] : data.pages.map((group, groupIndex) => ( {group.logEntries.map((row, i) => ( {row.timestamp} + {columns.map(function(column){ + return {row.tags.filter((tag) => tag.key === column).map((tag => { return tag.value }))}; + })} {searchParam === "" ? row.data : {row.data}} ))} @@ -153,47 +130,57 @@ export function Logs() { /> -