Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to the new Telemetry API #29

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package: build
cd bin && zip -r extension.zip extensions

publish: package
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co'
aws lambda publish-layer-version --layer-name axiom-development-lambda-extension-go --region eu-west-1 --zip-file "fileb://bin/extension.zip" --compatible-architectures ${GOARCH} --description 'axiom lambda extension to push lambda logs to https://axiom.co'

arch:
echo ${GOARCH}
Expand Down
4 changes: 2 additions & 2 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ const (
extensionIdentifierHeader = "Lambda-Extension-Identifier"
)

func New(LogsAPI string) *Client {
func New(telemetryAPI string) *Client {
return &Client{
baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", LogsAPI),
baseURL: fmt.Sprintf("http://%s/2020-01-01/extension", telemetryAPI),
httpClient: &http.Client{},
}
}
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

"github.com/axiomhq/axiom-lambda-extension/extension"
"github.com/axiomhq/axiom-lambda-extension/flusher"
"github.com/axiomhq/axiom-lambda-extension/logsapi"
"github.com/axiomhq/axiom-lambda-extension/server"
"github.com/axiomhq/axiom-lambda-extension/telemetryapi"
)

var (
Expand Down Expand Up @@ -92,22 +92,22 @@ func Run() error {
}

// LOGS API SUBSCRIPTION
logsClient := logsapi.New(runtimeAPI)
telemetryClient := telemetryapi.New(runtimeAPI)

destination := logsapi.Destination{
destination := telemetryapi.Destination{
Protocol: "HTTP",
URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)),
URI: telemetryapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s/", logsPort)),
HttpMethod: "POST",
Encoding: "JSON",
}

bufferingCfg := logsapi.BufferingCfg{
bufferingCfg := telemetryapi.BufferingCfg{
MaxItems: uint32(defaultMaxItems),
MaxBytes: uint32(defaultMaxBytes),
TimeoutMS: uint32(defaultTimeoutMS),
}

_, err = logsClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID)
_, err = telemetryClient.Subscribe(ctx, []string{"function", "platform"}, bufferingCfg, destination, extensionClient.ExtensionID)
if err != nil {
return err
}
Expand Down
60 changes: 23 additions & 37 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"io"
"net/http"
"regexp"
"strings"

"os"
"strconv"
Expand Down Expand Up @@ -38,8 +36,6 @@ var (
axiomMetaInfo = map[string]string{}
)

var logLineRgx, _ = regexp.Compile(`^([0-9.:TZ-]{20,})\s+([0-9a-f-]{36})\s+(ERROR|INFO|WARN|DEBUG|TRACE)\s+(?s:(.*))`)

func init() {
logger, _ = zap.NewProduction()

Expand Down Expand Up @@ -82,16 +78,37 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
}

notifyRuntimeDone := false
requestID := ""

for _, e := range events {
e["message"] = ""
// if reocrd key exists, extract the requestId and message from it
if rec, ok := e["record"]; ok {
if record, ok := rec.(map[string]any); ok {
// capture requestId and set it if it exists
if reqID, ok := record["requestId"]; ok {
requestID = reqID.(string)
}
if e["type"] == "function" {
// set message
e["message"] = record["message"].(string)
}
}
}

// attach the lambda information to the event
e["lambda"] = lambdaMetaInfo
e["axiom"] = axiomMetaInfo
// replace the time field with axiom's _time
e["_time"], e["time"] = e["time"], nil

if e["type"] == "function" {
extractEventMessage(e)
// If we didn't find a message in record field, move the record to message
// and assign requestId
if e["type"] == "function" && e["message"] == "" {
e["message"] = e["record"]
e["record"] = map[string]string{
"requestId": requestID,
}
}

// decide if the handler should notify the extension that the runtime is done
Expand All @@ -115,34 +132,3 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
}
}
}

// extractEventMessage extracts the message from the record field and puts it in the message field
// it detects if the record is a json string or a text log line that confirms to AWS log line formatting.
func extractEventMessage(e map[string]any) {
e["message"] = e["record"]
if recordStr, ok := e["record"].(string); ok && len(recordStr) > 0 {
recordStr = strings.Trim(recordStr, "\n")
// parse the record
// first check if the record is a json object, if not parse it as a text log line
if recordStr[0] == '{' && recordStr[len(recordStr)-1] == '}' {
var record map[string]any
err := json.Unmarshal([]byte(recordStr), &record)
if err != nil {
logger.Error("Error unmarshalling record:", zap.Error(err))
// do not return, we want to continue processing the event
} else {
if level, ok := record["level"].(string); ok {
record["level"] = strings.ToLower(level)
}
e["level"] = record["level"]
e["record"] = record
}
} else {
matches := logLineRgx.FindStringSubmatch(recordStr)
if len(matches) == 5 {
e["level"] = strings.ToLower(matches[3])
e["record"] = map[string]any{"requestId": matches[2], "message": matches[4], "timestamp": matches[1], "level": e["level"]}
}
}
}
}
98 changes: 0 additions & 98 deletions server/server_test.go

This file was deleted.

12 changes: 6 additions & 6 deletions logsapi/logs.go → telemetryapi/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logsapi
package telemetryapi

import (
"bytes"
Expand Down Expand Up @@ -68,19 +68,19 @@ type SubscribeResponse struct {

const (
lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier"
SchemaVersion20210318 = "2021-03-18"
SchemaVersionLatest = SchemaVersion20210318
SchemaVersion20221213 = "2022-12-13"
SchemaVersionLatest = SchemaVersion20221213
)

func New(LogsAPI string) *Client {
func New(runtimeAPI string) *Client {
return &Client{
baseURL: fmt.Sprintf("http://%s/2020-08-15", LogsAPI),
baseURL: fmt.Sprintf("http://%s/2022-07-01", runtimeAPI),
httpClient: &http.Client{},
}
}

func (lc *Client) Subscribe(ctx context.Context, types []string, bufferingCfg BufferingCfg, destination Destination, extensionID string) (*SubscribeResponse, error) {
subscribeEndpoint := lc.baseURL + "/logs"
subscribeEndpoint := lc.baseURL + "/telemetry"

subReq, err := json.Marshal(
&SubscribeRequest{
Expand Down
Loading