Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flusher): Retry flush on exit #21

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (
"time"

"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
"go.uber.org/zap"

"github.com/axiomhq/axiom-lambda-extension/version"
)

type RetryOpt int

const (
NoRetry RetryOpt = iota
Retry
)

// Axiom Config
var (
axiomToken = os.Getenv("AXIOM_TOKEN")
Expand All @@ -29,24 +37,37 @@ func init() {

type Axiom struct {
client *axiom.Client
retryClient *axiom.Client
events []axiom.Event
eventsLock sync.Mutex
lastFlushTime time.Time
}

func New() (*Axiom, error) {
client, err := axiom.NewClient(
// We create two almost identical clients, but one will retry and one will
// not. This is mostly because we are just waiting for the next flush with
// the next event most of the time, but want to retry on exit/shutdown.

opts := []axiom.Option{
axiom.SetAPITokenConfig(axiomToken),
axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())),
axiom.SetNoRetry(),
)
}

retryClient, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

opts = append(opts, axiom.SetNoRetry())
client, err := axiom.NewClient(opts...)
if err != nil {
return nil, err
}

f := &Axiom{
client: client,
events: make([]axiom.Event, 0),
client: client,
retryClient: retryClient,
events: make([]axiom.Event, 0),
}

return f, nil
Expand All @@ -73,7 +94,7 @@ func (f *Axiom) QueueEvents(events []axiom.Event) {
f.events = append(f.events, events...)
}

func (f *Axiom) Flush() {
func (f *Axiom) Flush(opt RetryOpt) {
f.eventsLock.Lock()
var batch []axiom.Event
// create a copy of the batch, clear the original
Expand All @@ -85,9 +106,20 @@ func (f *Axiom) Flush() {
return
}

res, err := f.client.IngestEvents(context.Background(), axiomDataset, batch)
var res *ingest.Status
var err error
if opt == Retry {
res, err = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch)
} else {
res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch)
}

if err != nil {
logger.Error("failed to ingest events", zap.Error(err))
if opt == Retry {
logger.Error("Failed to ingest events", zap.Error(err))
} else {
logger.Error("Failed to ingest events (will try again with next event)", zap.Error(err))
}
// allow this batch to be retried again, put them back
f.eventsLock.Lock()
defer f.eventsLock.Unlock()
Expand Down
29 changes: 16 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Run() error {
if err != nil {
// We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it.
// so we continue even if Axiom client is nil
logger.Error("error creating axiom client, no logs will send to Axiom.", zap.Error(err))
logger.Error("Failed to create Axiom client, no logs will be sent to Axiom", zap.Error(err))
// if users want to crash on error, they can set the PANIC_ON_API_ERROR env variable
if crashOnAPIErr == "true" {
return err
Expand Down Expand Up @@ -112,41 +112,44 @@ func Run() error {
return err
}

// Make sure we flush with retry on exit
defer func() {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush(flusher.Retry)
})
}()

for {
select {
case <-ctx.Done():
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
logger.Info("Context Done", zap.Any("ctx", ctx.Err()))
logger.Info("Context done", zap.Error(ctx.Err()))
return nil
default:
res, err := extensionClient.NextEvent(ctx, extensionName)
if err != nil {
logger.Error("Next event Failed:", zap.Error(err))
logger.Error("Next event failed:", zap.Error(err))
return err
}

// on every event received, check if we should flush
// On every event received, check if we should flush
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
if client.ShouldFlush() {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
}
})

// wait for the first invocation to finish (receive platform.runtimeDone log), then flush
// Wait for the first invocation to finish (receive platform.runtimeDone log), then flush
if isFirstInvocation {
<-runtimeDone
isFirstInvocation = false
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
// No retry, we'll try again with the next event
client.Flush(flusher.NoRetry)
})
}

if res.EventType == "SHUTDOWN" {
flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) {
client.Flush()
})
_ = httpServer.Shutdown()
return nil
}
Expand Down