From 5a1d546a6f338b17f74e9738c6f0b4b529ede470 Mon Sep 17 00:00:00 2001 From: Lukas Malkmus Date: Thu, 25 Apr 2024 21:29:17 +0200 Subject: [PATCH] feat(ingest): better batching --- .golangci.yaml | 3 ++ internal/cmd/ingest/ingest.go | 65 ++++++++++++++++++++++++----------- internal/cmd/query/query.go | 2 +- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index a20ca4a..d31c116 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -57,6 +57,9 @@ issues: - linters: - staticcheck text: "SA1019: client.Datasets.QueryLegacy" + - linters: + - staticcheck + text: "SA1019: client.QueryLegacy" - linters: - staticcheck text: 'SA1019: "github.com/axiomhq/axiom-go/axiom/querylegacy"' diff --git a/internal/cmd/ingest/ingest.go b/internal/cmd/ingest/ingest.go index 5b7f27d..9dc9115 100644 --- a/internal/cmd/ingest/ingest.go +++ b/internal/cmd/ingest/ingest.go @@ -53,9 +53,11 @@ type options struct { // Delimiter that separates CSV fields. Delimiter string // FlushEvery flushes the ingestion buffer after the specified duration. It - // is only valid when ingesting a stream of newline delimited JSON objects - // of unknown length. + // is only valid when ingesting batchable data, e.g. newline delimited JSON + // and CSV (with field names explicitly set) data that is not encoded. FlushEvery time.Duration + // BatchSize to aim for when ingesting batchable data. + BatchSize uint // ContentType of the data to ingest. ContentType axiom.ContentType contentType string // for the flag value @@ -81,7 +83,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command { } cmd := &cobra.Command{ - Use: "ingest [(-f|--file) [ ...]] [--timestamp-field ] [--timestamp-format ] [--flush-every ] [(-t|--content-type ] [(-e|--content-encoding ] [(-l|--label) : [ ...]]", + Use: "ingest [(-f|--file) [ ...]] [--timestamp-field ] [--timestamp-format ] [(-d|--delimiter ] [--flush-every ] [(-b|--batch-size ] [(-t|--content-type ] [(-e|--content-encoding ] [(-l|--label) : [ ...]] [--csv-fields [ ...]] [--continue-on-error ]", Short: "Ingest structured data", Long: heredoc.Doc(` Ingest structured data into an Axiom dataset. @@ -193,7 +195,13 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command { if err := complete(cmd.Context(), opts); err != nil { return err } - return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed, cmd.Flag("csv-fields").Changed) + return run( + cmd.Context(), + opts, + cmd.Flag("flush-every").Changed, + cmd.Flag("batch-size").Changed, + cmd.Flag("csv-fields").Changed, + ) }, } @@ -201,7 +209,8 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command { cmd.Flags().StringVar(&opts.TimestampField, "timestamp-field", "", "Field to take the ingestion time from (defaults to _time)") cmd.Flags().StringVar(&opts.TimestampFormat, "timestamp-format", "", "Format used in the the timestamp field. Default uses a heuristic parser. Must be expressed using the reference time 'Mon Jan 2 15:04:05 -0700 MST 2006'") cmd.Flags().StringVarP(&opts.Delimiter, "delimiter", "d", "", "Delimiter that separates CSV fields (only valid when input is CSV") - cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second, "Buffer flush interval for newline delimited JSON streams of unknown length") + cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second*5, "Buffer flush interval for batchable data") + cmd.Flags().UintVarP(&opts.BatchSize, "batch-size", "b", 10_000, "Batch size to aim for") cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)") cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest") cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side") @@ -212,9 +221,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command { _ = cmd.RegisterFlagCompletionFunc("timestamp-format", cmdutil.NoCompletion) _ = cmd.RegisterFlagCompletionFunc("delimiter", cmdutil.NoCompletion) _ = cmd.RegisterFlagCompletionFunc("flush-every", cmdutil.NoCompletion) + _ = cmd.RegisterFlagCompletionFunc("batch-size", cmdutil.NoCompletion) _ = cmd.RegisterFlagCompletionFunc("content-type", contentTypeCompletion) _ = cmd.RegisterFlagCompletionFunc("content-encoding", contentEncodingCompletion) _ = cmd.RegisterFlagCompletionFunc("label", cmdutil.NoCompletion) + _ = cmd.RegisterFlagCompletionFunc("csv-fields", cmdutil.NoCompletion) _ = cmd.RegisterFlagCompletionFunc("continue-on-error", cmdutil.NoCompletion) if opts.IO.IsStdinTTY() { @@ -265,7 +276,7 @@ func complete(ctx context.Context, opts *options) error { }, &opts.Dataset, opts.IO.SurveyIO()) } -func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) error { +func run(ctx context.Context, opts *options, flushEverySet, batchSizeSet, csvFieldsSet bool) error { client, err := opts.Client(ctx) if err != nil { return err @@ -305,20 +316,23 @@ func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) e typ = opts.ContentType } - if flushEverySet && typ != axiom.NDJSON { - return cmdutil.NewFlagErrorf("--flush-every not valid when content type is not newline delimited JSON") - } if opts.Delimiter != "" && typ != axiom.CSV { return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV") } var ( - batchable = typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet) + batchable = (typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)) && + opts.ContentEncoding == axiom.Identity ingestRes *ingest.Status ) - if filename == "stdin" && batchable && opts.ContentEncoding == axiom.Identity { + if batchable { ingestRes, err = ingestEvery(ctx, client, r, typ, opts) } else { + if flushEverySet { + return cmdutil.NewFlagErrorf("--flush-every not valid when data is not batchable") + } else if batchSizeSet { + return cmdutil.NewFlagErrorf("--batch-size not valid when data is not batchable") + } ingestRes, err = ingestReader(ctx, client, r, typ, opts) } @@ -375,7 +389,6 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi defer t.Stop() readers := make(chan io.Reader) - go func() { defer close(readers) @@ -383,9 +396,9 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi pr, pw := io.Pipe() readers <- pr - // Start with a 64 byte buffer, check up until 1 MB per line. + // Start with a 1 KB buffer, check up until 1 MB per line. scanner := bufio.NewScanner(r) - scanner.Buffer(make([]byte, 64), 1024*1024) + scanner.Buffer(make([]byte, 1024), 1024*1024) scanner.Split(splitLinesMulti) // We need to scan in a go func to make sure we don't block on @@ -414,23 +427,35 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi } }() + var lineCount uint + flushBatch := func() { + if err := pw.Close(); err != nil { + return + } + + pr, pw = io.Pipe() + readers <- pr + + lineCount = 0 + t.Reset(opts.FlushEvery) + } for { select { case <-ctx.Done(): _ = pw.CloseWithError(ctx.Err()) return case <-t.C: - if err := pw.Close(); err != nil { - return + flushBatch() + case line := <-lines: + if lineCount >= opts.BatchSize { + flushBatch() } - pr, pw = io.Pipe() - readers <- pr - case line := <-lines: if _, err := pw.Write(line); err != nil { _ = pw.CloseWithError(err) return } + lineCount++ case <-done: _ = pw.Close() return @@ -483,7 +508,7 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax ingestOptions = append(ingestOptions, opts.Labels...) ingestOptions = append(ingestOptions, opts.CSVFields...) - res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...) + res, err := client.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...) if err != nil { return nil, err } diff --git a/internal/cmd/query/query.go b/internal/cmd/query/query.go index 1b32487..58c84d0 100644 --- a/internal/cmd/query/query.go +++ b/internal/cmd/query/query.go @@ -154,7 +154,7 @@ func run(ctx context.Context, opts *options) error { progStop := opts.IO.StartActivityIndicator() defer progStop() - res, err := client.Datasets.Query(ctx, opts.Query, + res, err := client.Query(ctx, opts.Query, query.SetStartTime(opts.startTime), query.SetEndTime(opts.endTime), )