Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix missing lambda report log events #34

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
# Axiom Lambda Extension

<a href="https://axiom.co">
<picture>
<source media="(prefers-color-scheme: dark) and (min-width: 600px)" srcset="https://axiom.co/assets/github/axiom-github-banner-light-vertical.svg">
<source media="(prefers-color-scheme: light) and (min-width: 600px)" srcset="https://axiom.co/assets/github/axiom-github-banner-dark-vertical.svg">
<source media="(prefers-color-scheme: dark) and (max-width: 599px)" srcset="https://axiom.co/assets/github/axiom-github-banner-light-horizontal.svg">
<img alt="Axiom.co banner" src="https://axiom.co/assets/github/axiom-github-banner-dark-horizontal.svg" align="right">
</picture>
</a>
&nbsp;

Use Axiom Lambda Extension to send logs and platform events of your Lambda function to [Axiom](https://axiom.co/). Axiom detects the extension and provides you with quick filters and a dashboard.

With the Axiom Lambda extension, you can forget about the extra configuration of CloudWatch and subscription filters.

## Usage


```sh
aws lambda update-function-configuration --function-name my-function \
--layers arn:aws:lambda:AWS_REGION:694952825951:layer:axiom-extension-ARCH:VERSION
```

## Documentation

For more information on how to set up and use the Axiom Lambda Extension, see the [Axiom documentation](https://axiom.co/docs/send-data/aws-lambda).
23 changes: 23 additions & 0 deletions decision/0001-no-wait-for-first-invocation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
author: Islam Shehata (@dasfmi)
date: 2024-011-14
---

# No wait for first invocation

## Background

We had a mechanism in the extension that blocks execution until we receive the first invocation. This was done to ensure that the customers can
receive logs for their first invocation. However, this was causing issues in some cases where the extension was not able to receive the first invocation
and times out.

As this occurs more often, the invocation numbers in Axiom were significantly lower than the CloudWatch invocations.


## Decision

- Decided to remove the blocking mechanism and allow the extension to start processing logs as soon as it is invoked. This will ensure that the customers receive any incoming logs without any delay. Removal of the first invocation and the channel associated with it simplifies the extension and simplifies the workflow.
- We had a function that checks if we should flush or not based on last time of flush. I replaced this function with a simple ticker instead, the ticker will tick every 1s and will flush the logs if the buffer is not empty. This will ensure that we are not blocking the logs for a long time.
- We had two Axiom clients, one that retries and one that doesn't. I think it complicates things and we should have only one client that retries. We still need a mechanism to prevent infinite retries though, a circuit breaker or something.



48 changes: 10 additions & 38 deletions flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"os"
"sync"
"time"

"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
Expand All @@ -24,23 +23,19 @@ const (

// Axiom Config
var (
axiomToken = os.Getenv("AXIOM_TOKEN")
axiomDataset = os.Getenv("AXIOM_DATASET")
batchSize = 1000
flushInterval = 1 * time.Second
logger *zap.Logger
axiomToken = os.Getenv("AXIOM_TOKEN")
axiomDataset = os.Getenv("AXIOM_DATASET")
logger *zap.Logger
)

func init() {
logger, _ = zap.NewProduction()
}

type Axiom struct {
client *axiom.Client
retryClient *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
lastFlushTime time.Time
client *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
}

func New() (*Axiom, error) {
Expand All @@ -53,33 +48,20 @@ func New() (*Axiom, error) {
axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())),
}

retryClient, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

opts = append(opts, axiom.SetNoRetry())
client, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

f := &Axiom{
client: client,
retryClient: retryClient,
events: make([]axiom.Event, 0),
client: client,
events: make([]axiom.Event, 0),
}

return f, nil
}

func (f *Axiom) ShouldFlush() bool {
f.eventsLock.Lock()
defer f.eventsLock.Unlock()

return len(f.events) > batchSize || f.lastFlushTime.IsZero() || time.Since(f.lastFlushTime) > flushInterval
}

func (f *Axiom) Queue(event axiom.Event) {
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand All @@ -94,32 +76,22 @@ func (f *Axiom) QueueEvents(events []axiom.Event) {
f.events = append(f.events, events...)
}

func (f *Axiom) Flush(opt RetryOpt) {
func (f *Axiom) Flush() {
f.eventsLock.Lock()
var batch []axiom.Event
// create a copy of the batch, clear the original
batch, f.events = f.events, []axiom.Event{}
f.eventsLock.Unlock()

f.lastFlushTime = time.Now()
if len(batch) == 0 {
return
}

var res *ingest.Status
var err error
if opt == Retry {
res, err = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch)
} else {
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)
}
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)

if err != nil {
if opt == Retry {
logger.Error("Failed to ingest events", zap.Error(err))
} else {
logger.Error("Failed to ingest events (will try again with next event)", zap.Error(err))
}
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand Down
38 changes: 13 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/peterbourgon/ff/v2/ffcli"
"go.uber.org/zap"
Expand All @@ -19,11 +20,9 @@ import (
)

var (
runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API")
crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR")
extensionName = filepath.Base(os.Args[0])
isFirstInvocation = true
runtimeDone = make(chan struct{})
runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API")
crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR")
extensionName = filepath.Base(os.Args[0])

// API Port
logsPort = "8080"
Expand Down Expand Up @@ -62,6 +61,9 @@ func Run() error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer stop()

ticker := time.NewTicker(1 * time.Second) // Flush every second
defer ticker.Stop()

axiom, err := flusher.New()
if err != nil {
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
Expand All @@ -73,7 +75,7 @@ func Run() error {
}
}

httpServer := server.New(logsPort, axiom, runtimeDone)
httpServer := server.New(logsPort, axiom)
go httpServer.Run(ctx)

var extensionClient *extension.Client
Expand Down Expand Up @@ -115,7 +117,7 @@ func Run() error {
// Make sure we flush with retry on exit
defer func() {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush(flusher.Retry)
client.Flush()
})
}()

Expand All @@ -124,31 +126,17 @@ func Run() error {
case <-ctx.Done():
logger.Info("Context done", zap.Error(ctx.Err()))
return nil
case <-ticker.C:
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
axiom.Flush()
})
default:
res, err := extensionClient.NextEvent(ctx, extensionName)
if err != nil {
logger.Error("Next event failed:", zap.Error(err))
return err
}

// On every event received, check if we should flush
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
}
})

// Wait for the first invocation to finish (receive platform.runtimeDone log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
})
}

if res.EventType == "SHUTDOWN" {
_ = httpServer.Shutdown()
return nil
Expand Down
25 changes: 5 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

var (
logger *zap.Logger
firstInvocationDone = false
logger *zap.Logger
)

// lambda environment variables
Expand Down Expand Up @@ -52,8 +51,8 @@ func init() {
}
}

func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHttp.Server {
s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom, runtimeDone))
func New(port string, axiom *flusher.Axiom) *axiomHttp.Server {
s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom))
if err != nil {
logger.Error("Error creating server", zap.Error(err))
return nil
Expand All @@ -62,7 +61,7 @@ func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHtt
return s
}

func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc {
func httpHandler(ax *flusher.Axiom) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -77,12 +76,11 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
return
}

notifyRuntimeDone := false
requestID := ""

for _, e := range events {
e["message"] = ""
// if reocrd key exists, extract the requestId and message from it
// if record key exists, extract the requestId and message from it
if rec, ok := e["record"]; ok {
if record, ok := rec.(map[string]any); ok {
// capture requestId and set it if it exists
Expand Down Expand Up @@ -110,25 +108,12 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
"requestId": requestID,
}
}

// decide if the handler should notify the extension that the runtime is done
if e["type"] == "platform.runtimeDone" && !firstInvocationDone {
notifyRuntimeDone = true
}
}

// queue all the events at once to prevent locking and unlocking the mutex
// on each event
flusher.SafelyUseAxiomClient(ax, func(client *flusher.Axiom) {
client.QueueEvents(events)
})

// inform the extension that platform.runtimeDone event has been received
if notifyRuntimeDone {
runtimeDone <- struct{}{}
firstInvocationDone = true
// close the channel since it will not be longer used
close(runtimeDone)
}
}
}
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package version

// manually set constant version
const version string = "v11"
const version string = "v12"

// Get returns the Go module version of the axiom-go module.
func Get() string {
Expand Down
Loading