Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Add an exported flush function to the BulkIndexer #336

Open
ae-ou opened this issue Jun 20, 2023 · 5 comments
Open

[FEATURE] Add an exported flush function to the BulkIndexer #336

ae-ou opened this issue Jun 20, 2023 · 5 comments
Labels
enhancement New feature or request

Comments

@ae-ou
Copy link

ae-ou commented Jun 20, 2023

Is your feature request related to a problem?

I have a Lambda that consumes events from a Kinesis stream and posts them to an OpenSearch Cluster - I use BulkIndexer to send in a large number of documents more efficiently (i.e. via the /bulk API).

Currently, I build up most of my dependencies (e.g. DB connection pools, AWS Signer, etc) in the main() function of my Lambda, and then I pass these dependencies to a struct. I create a handler function which receives the struct containing all of these dependencies - this way, as long as my Lambda stays warm, the dependencies are readily available for the handler to use on a subsequent invocation. e.g.:

package main

import (
	"context"
	"database/sql"
	"github.com/aws/aws-lambda-go/lambda"
)

//Dependencies can be reused across subsequent invocations of the Lambda - which speeds up non-cold starts (as we don't have to create the dependencies for injection)
type Dependencies struct {
	DbCon *sql.DB
	//... more dependencies
}

func main() {
	myDb := sql.OpenDB() //an imaginary DB that you still shouldn't call DROP on.

	d := Dependencies{
		DbCon: myDb,
	}

	lambda.Start(d.MyHandler)
}

func (d Dependencies) MyHandler(c context.Context) error {
	tx, err := d.DbCon.Begin()
	//Business logic here
}

I want to be able to pass the BulkIndexer on my dependency struct, but there's an issue. Your documentation for the BulkIndexer says:

// You must call the Close() method after you're done adding items.

The problem is that when you call Close() against BulkIndexer, you close queue (which is the channel that Add() appends requests to).
This creates a problem on subsequent invocations of the Lambda because if the channel is closed (by the call to Close() on a previous invocation), then calling Add() will result in trying to pass data to a nonexistent channel - which results in a panic.

This means that I can't store the BulkIndexer in my dependency struct because the channel underpinning the whole thing may be nonexistent on subsequent invocations. This means that I have to instantiate the BulkIndexer for each invocation of the Lambda (directly on the handler - even if the function is still warm).

What solution would you like?

I would like to be able to call Flush() directly against the BulkIndexer.

The Close() function already calls flush() against the workers, and this fires off whatever requests may be remaining in the queue (in addition to invoking the relevant callback functions).
If we were able to call BulkIndexer.Flush() directly, we could clear out whatever requests may remain in the queue (without closing it), and then call return on the Lambda afterwards - this would allow us to reuse the BulkIndexer on subsequent invocations of the Lambda (without risk of closing the channel/causing a panic).

What alternatives have you considered?

  1. Setting BulkIndexerConfig.FlushInterval.
    a. The problem here is that we have to wait for the timer to hit 0 for the flush to occur - this means that we have to keep the Lambda running (which costs money and inflates our execution time metrics).
    b. There's also no clean way to see the amount of time until the next flush.
    c. You could set BulkIndexerConfig.FlushInterval to a low number - but if you're flushing/firing off requests every few seconds, you defeat the purpose of using the /bulk API.
  2. Setting BulkIndexerConfig.FlushBytes
    a. The problem here is that we have to wait for the BulkIndexer to hit the threshold - which isn't always guaranteed to happen - e.g. if you've just flushed the data, but then you Add() one small request.
    b. You could set BulkIndexerConfig.FlushBytes to a low number - but if you're flushing/firing off requests after a small number of Add() calls, you defeat the purpose of using the /bulk API.
  3. Create the BulkIndexer in the handler - every time that the Lambda is called
    a. This is what I do currently
    b. This means that you can call Close() at the end of your handler function (to flush any outstanding requests in the queue)
    c. This significantly increases the runtime of my Lambda (on every invocation), which has a direct impact on running cost.

Do you have any additional context?

Setting up dependencies outside of a Lambda's function handler is an AWS best practice - see here:

Take advantage of execution environment reuse to improve the performance of your function. Initialize SDK clients and database connections outside of the function handler, and cache static assets locally in the /tmp directory. Subsequent invocations processed by the same instance of your function can reuse these resources. This saves cost by reducing function run time.

@ae-ou ae-ou added enhancement New feature or request untriaged labels Jun 20, 2023
@wbeckler
Copy link

I've never looked at this before, so assume I might be way off here. If the docs say to call close after "you're done adding items," then might it work if you don't call close until all processes/workers have finished adding items. And since you never finish, you never call close()? Again, I hope I'm not wasting your time with this question, but I thought it worth asking with what little I know.

@ae-ou
Copy link
Author

ae-ou commented Jun 21, 2023

Hi @wbeckler. My handler fires off goroutines for each Kinesis record so that I can call BulkIndexer.Add() for multiple records concurrenttly, I then have a sync.WaitGroup that waits for all of these goroutines to complete before I call Bulkindexer.Close() in the handler. At the point that I call Close(), I already know that there are no more additions to be made (on this invocation of the Lambda).

The reason that I have to call Close() is that it forces a flush of the queue - meaning that the workers send all of the remaining queued up requests to OpenSearch (if I understand correctly). I have to do this before the handler returns because the alternative is for the handler to return with requests (potentially) still in the BulkIndexer.queue - getting stale.

Hypothetically, if the Lambda stays warm and a subsequent invocation occurs, any stale requests (from a previous invocation of the Lambda) should still be in the BulkIndexer.queue, and they should be fired off (I haven't confirmed this). The issue is that there's no guarantee of a subsequent invocation of the Lambda - if there isn't one, the stored data will be killed off (including the BulkIndexer) and the next invocation will be a cold start - potentially losing queued requests in the process.

@dblock
Copy link
Member

dblock commented Jun 21, 2023

I think adding .Flush() makes sense, want to give it a try?

@ansssu
Copy link

ansssu commented Nov 3, 2023

@ae-ou , In my opinion, this feature request doesn't make sense, and your approach to handling the bulk inserts represents a potential risk of data loss. I will try to explain why.

An AWS lambda runs in a container environment. The first time a function executes after being created or having its code or configuration updated, a new container will be created. The code is loaded into this container for execution. Subsequently, your code performs its tasks, and the Lambda finishes.

Later on, when you invoke the AWS Lambda again, it can follow two paths: If the code hasn't changed or not much time has gone by, Lambda MAY reuse the previous container and its shared resources or the Lambda MAY create a new execution container from scratch. This behavior isn't something you can control; it's determined by the Lambda itself.

With that said, you should avoid storing data in shared resources. If you run the Lambda and save the data into shared resources and, in the next Lambda execution, the lambda decides to create a new container, then you will lose all data that hasn't been flushed in the previous Lambda execution.

You are trying to make the Lambda function operate in a stateful manner, whereas the lambda's intended behavior is to be completely stateless.

In this scenario, my recommendation is to create the bulk indexer within the handler each time the Lambda is invoked.

@yves-tutti
Copy link

Outside of the described Lambda scenario on which I can't comment, having a Flush() on the bulk-indexer comes handy when writing test-code. It's required when trying port code that was written using https://github.com/olivere/elastic/blob/29ee98974cf1984dfecf53ef772d721fb97cb0b9/bulk_processor.go#L409

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants