From 171fa8609102e647670d5f8e7d4a52e0cb90985b Mon Sep 17 00:00:00 2001 From: Dag Wullt <289691+DagW@users.noreply.github.com> Date: Tue, 10 Dec 2024 10:41:33 +0100 Subject: [PATCH 1/2] Add a worker ID context value to flush events Signed-off-by: Dag Wullt <289691+DagW@users.noreply.github.com> --- opensearchutil/bulk_indexer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/opensearchutil/bulk_indexer.go b/opensearchutil/bulk_indexer.go index fe93dd58..2807c58a 100644 --- a/opensearchutil/bulk_indexer.go +++ b/opensearchutil/bulk_indexer.go @@ -41,7 +41,13 @@ import ( "github.com/opensearch-project/opensearch-go/v4/opensearchapi" ) -const defaultFlushInterval = 30 * time.Second +// WorkerIdCtxKeyType is a context value type to get the worker ID from the context in flush callbacks +type WorkerIdCtxKeyType string + +const ( + WorkerCtxKey WorkerIdCtxKeyType = "workerId" + defaultFlushInterval = 30 * time.Second +) // BulkIndexer represents a parallel, asynchronous, efficient indexer for OpenSearch. type BulkIndexer interface { @@ -452,6 +458,7 @@ func (w *worker) writeBody(item *BulkIndexerItem) error { // flush writes out the worker buffer; it must be called under a lock. func (w *worker) flush(ctx context.Context) error { + ctx := context.WithValue(ctx, WorkerCtxKey, w.id) if w.bi.config.OnFlushStart != nil { ctx = w.bi.config.OnFlushStart(ctx) } From 2569d0c321930770337650031c9c97fac6931419 Mon Sep 17 00:00:00 2001 From: Dag Wullt <289691+DagW@users.noreply.github.com> Date: Tue, 10 Dec 2024 10:43:08 +0100 Subject: [PATCH 2/2] Linter fix: context exists Signed-off-by: Dag Wullt <289691+DagW@users.noreply.github.com> --- opensearchutil/bulk_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opensearchutil/bulk_indexer.go b/opensearchutil/bulk_indexer.go index 2807c58a..065afee0 100644 --- a/opensearchutil/bulk_indexer.go +++ b/opensearchutil/bulk_indexer.go @@ -458,7 +458,7 @@ func (w *worker) writeBody(item *BulkIndexerItem) error { // flush writes out the worker buffer; it must be called under a lock. func (w *worker) flush(ctx context.Context) error { - ctx := context.WithValue(ctx, WorkerCtxKey, w.id) + ctx = context.WithValue(ctx, WorkerCtxKey, w.id) if w.bi.config.OnFlushStart != nil { ctx = w.bi.config.OnFlushStart(ctx) }