Skip to content

Commit

Permalink
Merge pull request #21 from axiomhq/arne/fix-flush-on-exit
Browse files Browse the repository at this point in the history
fix(flusher): Retry flush on exit
  • Loading branch information
dasfmi authored Oct 10, 2023
2 parents cdf1654 + b88de71 commit 1ff8f14
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
48 changes: 40 additions & 8 deletions flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (
"time"

"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
"go.uber.org/zap"

"github.com/axiomhq/axiom-lambda-extension/version"
)

type RetryOpt int

const (
NoRetry RetryOpt = iota
Retry
)

// Axiom Config
var (
axiomToken = os.Getenv("AXIOM_TOKEN")
Expand All @@ -29,24 +37,37 @@ func init() {

type Axiom struct {
client *axiom.Client
retryClient *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
lastFlushTime time.Time
}

func New() (*Axiom, error) {
client, err := axiom.NewClient(
// We create two almost identical clients, but one will retry and one will
// not. This is mostly because we are just waiting for the next flush with
// the next event most of the time, but want to retry on exit/shutdown.

opts := []axiom.Option{
axiom.SetAPITokenConfig(axiomToken),
axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())),
axiom.SetNoRetry(),
)
}

retryClient, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

opts = append(opts, axiom.SetNoRetry())
client, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

f := &Axiom{
client: client,
events: make([]axiom.Event, 0),
client: client,
retryClient: retryClient,
events: make([]axiom.Event, 0),
}

return f, nil
Expand All @@ -73,7 +94,7 @@ func (f *Axiom) QueueEvents(events []axiom.Event) {
f.events = append(f.events, events...)
}

func (f *Axiom) Flush() {
func (f *Axiom) Flush(opt RetryOpt) {
f.eventsLock.Lock()
var batch []axiom.Event
// create a copy of the batch, clear the original
Expand All @@ -85,9 +106,20 @@ func (f *Axiom) Flush() {
return
}

res, err := f.client.IngestEvents(context.Background(), axiomDataset, batch)
var res *ingest.Status
var err error
if opt == Retry {
res, err = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch)
} else {
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)
}

if err != nil {
logger.Error("failed to ingest events", zap.Error(err))
if opt == Retry {
logger.Error("Failed to ingest events", zap.Error(err))
} else {
logger.Error("Failed to ingest events (will try again with next event)", zap.Error(err))
}
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand Down
29 changes: 16 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Run() error {
if err != nil {
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
// so we continue even if Axiom client is nil
logger.Error("error creating axiom client, no logs will send to Axiom.", zap.Error(err))
logger.Error("Failed to create Axiom client, no logs will be sent to Axiom", zap.Error(err))
// if users want to crash on error, they can set the PANIC_ON_API_ERROR env variable
if crashOnAPIErr == "true" {
return err
Expand Down Expand Up @@ -112,41 +112,44 @@ func Run() error {
return err
}

// Make sure we flush with retry on exit
defer func() {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush(flusher.Retry)
})
}()

for {
select {
case <-ctx.Done():
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
logger.Info("Context Done", zap.Any("ctx", ctx.Err()))
logger.Info("Context done", zap.Error(ctx.Err()))
return nil
default:
res, err := extensionClient.NextEvent(ctx, extensionName)
if err != nil {
logger.Error("Next event Failed:", zap.Error(err))
logger.Error("Next event failed:", zap.Error(err))
return err
}

// on every event received, check if we should flush
// On every event received, check if we should flush
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
}
})

// wait for the first invocation to finish (receive platform.runtimeDone log), then flush
// Wait for the first invocation to finish (receive platform.runtimeDone log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
})
}

if res.EventType == "SHUTDOWN" {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
_ = httpServer.Shutdown()
return nil
}
Expand Down

0 comments on commit 1ff8f14

Please sign in to comment.