From 48e92eaa93a0007773cece0ca16dee6869e61907 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Thu, 5 Oct 2023 15:11:36 +0200 Subject: [PATCH] fix(flusher): Retry flush on exit The flusher keeps a batch of events so retrying doesn't make sense when we'll flush again with the next event. But on exit there won't be a next event, so we need to retry there. Because you can't set the retry behaviour after constructing an Axiom client, this creates a second one that retries and adds a `RetryOpt` to the `Flush` method. --- flusher/flusher.go | 44 ++++++++++++++++++++++++++++++++++++-------- main.go | 29 ++++++++++++++++------------- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/flusher/flusher.go b/flusher/flusher.go index 518294b..1767cff 100644 --- a/flusher/flusher.go +++ b/flusher/flusher.go @@ -9,11 +9,19 @@ import ( "time" "github.com/axiomhq/axiom-go/axiom" + "github.com/axiomhq/axiom-go/axiom/ingest" "go.uber.org/zap" "github.com/axiomhq/axiom-lambda-extension/version" ) +type RetryOpt int + +const ( + NoRetry RetryOpt = iota + Retry +) + // Axiom Config var ( axiomToken = os.Getenv("AXIOM_TOKEN") @@ -29,24 +37,37 @@ func init() { type Axiom struct { client *axiom.Client + retryClient *axiom.Client events []axiom.Event eventsLock sync.Mutex lastFlushTime time.Time } func New() (*Axiom, error) { - client, err := axiom.NewClient( + // We create two almost identical clients, but one will retry and one will + // not. This is mostly because we are just waiting for the next flush with + // the next event most of the time, but want to retry on exit/shutdown. + + opts := []axiom.Option{ axiom.SetAPITokenConfig(axiomToken), axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())), - axiom.SetNoRetry(), - ) + } + + retryClient, err := axiom.NewClient(opts...) + if err != nil { + return nil, err + } + + opts = append(opts, axiom.SetNoRetry()) + client, err := axiom.NewClient(opts...) if err != nil { return nil, err } f := &Axiom{ - client: client, - events: make([]axiom.Event, 0), + client: client, + retryClient: retryClient, + events: make([]axiom.Event, 0), } return f, nil @@ -73,7 +94,7 @@ func (f *Axiom) QueueEvents(events []axiom.Event) { f.events = append(f.events, events...) } -func (f *Axiom) Flush() { +func (f *Axiom) Flush(opt RetryOpt) { f.eventsLock.Lock() var batch []axiom.Event // create a copy of the batch, clear the original @@ -85,9 +106,16 @@ func (f *Axiom) Flush() { return } - res, err := f.client.IngestEvents(context.Background(), axiomDataset, batch) + var res *ingest.Status + var err error + if opt == Retry { + res, _ = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch) + } else { + res, _ = f.client.IngestEvents(context.Background(), axiomDataset, batch) + } + if err != nil { - logger.Error("failed to ingest events", zap.Error(err)) + logger.Error("Failed to ingest events", zap.Error(err)) // allow this batch to be retried again, put them back f.eventsLock.Lock() defer f.eventsLock.Unlock() diff --git a/main.go b/main.go index ab6b68b..b437f65 100644 --- a/main.go +++ b/main.go @@ -66,7 +66,7 @@ func Run() error { if err != nil { // We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it. // so we continue even if Axiom client is nil - logger.Error("error creating axiom client, no logs will send to Axiom.", zap.Error(err)) + logger.Error("Failed to create Axiom client, no logs will be sent to Axiom", zap.Error(err)) // if users want to crash on error, they can set the PANIC_ON_API_ERROR env variable if crashOnAPIErr == "true" { return err @@ -112,41 +112,44 @@ func Run() error { return err } + // Make sure we flush with retry on exit + defer func() { + flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { + client.Flush(flusher.Retry) + }) + }() + for { select { case <-ctx.Done(): - flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - client.Flush() - }) - logger.Info("Context Done", zap.Any("ctx", ctx.Err())) + logger.Info("Context done", zap.Error(ctx.Err())) return nil default: res, err := extensionClient.NextEvent(ctx, extensionName) if err != nil { - logger.Error("Next event Failed:", zap.Error(err)) + logger.Error("Next event failed:", zap.Error(err)) return err } - // on every event received, check if we should flush + // On every event received, check if we should flush flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { if client.ShouldFlush() { - client.Flush() + // No retry, we'll try again with the next event + client.Flush(flusher.NoRetry) } }) - // wait for the first invocation to finish (receive platform.runtimeDone log), then flush + // Wait for the first invocation to finish (receive platform.runtimeDone log), then flush if isFirstInvocation { <-runtimeDone isFirstInvocation = false flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - client.Flush() + // No retry, we'll try again with the next event + client.Flush(flusher.NoRetry) }) } if res.EventType == "SHUTDOWN" { - flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - client.Flush() - }) _ = httpServer.Shutdown() return nil }