Skip to content

Commit

Permalink
fix: do not wait for firstInovcation report
Browse files Browse the repository at this point in the history
  • Loading branch information
dasfmi committed Nov 14, 2024
1 parent c5e636a commit ba6d7e6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 82 deletions.
23 changes: 23 additions & 0 deletions decision/0001-no-wait-for-first-invocation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
author: Islam Shehata (@dasfmi)
date: 2024-011-14
---

# No wait for first invocation

## Background

We had a mechanism in the extension that blocks execution until we receive the first invocation. This was done to ensure that the customers can
receive logs for their first invocation. However, this was causing issues in some cases where the extension was not able to receive the first invocation
and times out.

As this occurs more often, the invocation numbers in Axiom were significantly lower than the CloudWatch invocations.


## Decision

- Decided to remove the blocking mechanism and allow the extension to start processing logs as soon as it is invoked. This will ensure that the customers receive any incoming logs without any delay. Removal of the first invocation and the channel associated with it simplifies the extension and simplifies the workflow.
- We had a function that checks if we should flush or not based on last time of flush. I replaced this function with a simple ticker instead, the ticker will tick every 1s and will flush the logs if the buffer is not empty. This will ensure that we are not blocking the logs for a long time.
- We had two Axiom clients, one that retries and one that doesn't. I think it complicates things and we should have only one client that retries. We still need a mechanism to prevent infinite retries though, a circuit breaker or something.



48 changes: 10 additions & 38 deletions flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"os"
"sync"
"time"

"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
Expand All @@ -24,23 +23,19 @@ const (

// Axiom Config
var (
axiomToken = os.Getenv("AXIOM_TOKEN")
axiomDataset = os.Getenv("AXIOM_DATASET")
batchSize = 1000
flushInterval = 1 * time.Second
logger *zap.Logger
axiomToken = os.Getenv("AXIOM_TOKEN")
axiomDataset = os.Getenv("AXIOM_DATASET")
logger *zap.Logger
)

func init() {
logger, _ = zap.NewProduction()
}

type Axiom struct {
client *axiom.Client
retryClient *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
lastFlushTime time.Time
client *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
}

func New() (*Axiom, error) {
Expand All @@ -53,33 +48,20 @@ func New() (*Axiom, error) {
axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())),
}

retryClient, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

opts = append(opts, axiom.SetNoRetry())
client, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

f := &Axiom{
client: client,
retryClient: retryClient,
events: make([]axiom.Event, 0),
client: client,
events: make([]axiom.Event, 0),
}

return f, nil
}

func (f *Axiom) ShouldFlush() bool {
f.eventsLock.Lock()
defer f.eventsLock.Unlock()

return len(f.events) > batchSize || f.lastFlushTime.IsZero() || time.Since(f.lastFlushTime) > flushInterval
}

func (f *Axiom) Queue(event axiom.Event) {
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand All @@ -94,32 +76,22 @@ func (f *Axiom) QueueEvents(events []axiom.Event) {
f.events = append(f.events, events...)
}

func (f *Axiom) Flush(opt RetryOpt) {
func (f *Axiom) Flush() {
f.eventsLock.Lock()
var batch []axiom.Event
// create a copy of the batch, clear the original
batch, f.events = f.events, []axiom.Event{}
f.eventsLock.Unlock()

f.lastFlushTime = time.Now()
if len(batch) == 0 {
return
}

var res *ingest.Status
var err error
if opt == Retry {
res, err = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch)
} else {
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)
}
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)

if err != nil {
if opt == Retry {
logger.Error("Failed to ingest events", zap.Error(err))
} else {
logger.Error("Failed to ingest events (will try again with next event)", zap.Error(err))
}
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand Down
38 changes: 13 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/peterbourgon/ff/v2/ffcli"
"go.uber.org/zap"
Expand All @@ -19,11 +20,9 @@ import (
)

var (
runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API")
crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR")
extensionName = filepath.Base(os.Args[0])
isFirstInvocation = true
runtimeDone = make(chan struct{})
runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API")
crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR")
extensionName = filepath.Base(os.Args[0])

// API Port
logsPort = "8080"
Expand Down Expand Up @@ -62,6 +61,9 @@ func Run() error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer stop()

ticker := time.NewTicker(1 * time.Second) // Flush every second
defer ticker.Stop()

axiom, err := flusher.New()
if err != nil {
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
Expand All @@ -73,7 +75,7 @@ func Run() error {
}
}

httpServer := server.New(logsPort, axiom, runtimeDone)
httpServer := server.New(logsPort, axiom)
go httpServer.Run(ctx)

var extensionClient *extension.Client
Expand Down Expand Up @@ -115,7 +117,7 @@ func Run() error {
// Make sure we flush with retry on exit
defer func() {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush(flusher.Retry)
client.Flush()
})
}()

Expand All @@ -124,31 +126,17 @@ func Run() error {
case <-ctx.Done():
logger.Info("Context done", zap.Error(ctx.Err()))
return nil
case <-ticker.C:
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
axiom.Flush()
})
default:
res, err := extensionClient.NextEvent(ctx, extensionName)
if err != nil {
logger.Error("Next event failed:", zap.Error(err))
return err
}

// On every event received, check if we should flush
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
}
})

// Wait for the first invocation to finish (receive platform.report log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
})
}

if res.EventType == "SHUTDOWN" {
_ = httpServer.Shutdown()
return nil
Expand Down
23 changes: 4 additions & 19 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

var (
logger *zap.Logger
firstInvocationDone = false
logger *zap.Logger
)

// lambda environment variables
Expand Down Expand Up @@ -52,8 +51,8 @@ func init() {
}
}

func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHttp.Server {
s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom, runtimeDone))
func New(port string, axiom *flusher.Axiom) *axiomHttp.Server {
s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom))
if err != nil {
logger.Error("Error creating server", zap.Error(err))
return nil
Expand All @@ -62,7 +61,7 @@ func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHtt
return s
}

func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc {
func httpHandler(ax *flusher.Axiom) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -77,7 +76,6 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
return
}

notifyRuntimeDone := false
requestID := ""

for _, e := range events {
Expand Down Expand Up @@ -110,25 +108,12 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc
"requestId": requestID,
}
}

// decide if the handler should notify the extension that the runtime is done
if e["type"] == "platform.report" && !firstInvocationDone {
notifyRuntimeDone = true
}
}

// queue all the events at once to prevent locking and unlocking the mutex
// on each event
flusher.SafelyUseAxiomClient(ax, func(client *flusher.Axiom) {
client.QueueEvents(events)
})

// inform the extension that platform.report event has been received
if notifyRuntimeDone {
runtimeDone <- struct{}{}
firstInvocationDone = true
// close the channel since it will not be longer used
close(runtimeDone)
}
}
}

0 comments on commit ba6d7e6

Please sign in to comment.