-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Add an exported flush function to the BulkIndexer #336
Comments
I've never looked at this before, so assume I might be way off here. If the docs say to call close after "you're done adding items," then might it work if you don't call close until all processes/workers have finished adding items. And since you never finish, you never call close()? Again, I hope I'm not wasting your time with this question, but I thought it worth asking with what little I know. |
Hi @wbeckler. My handler fires off goroutines for each Kinesis record so that I can call The reason that I have to call Hypothetically, if the Lambda stays warm and a subsequent invocation occurs, any stale requests (from a previous invocation of the Lambda) should still be in the |
I think adding |
@ae-ou , In my opinion, this feature request doesn't make sense, and your approach to handling the bulk inserts represents a potential risk of data loss. I will try to explain why. An AWS lambda runs in a container environment. The first time a function executes after being created or having its code or configuration updated, a new container will be created. The code is loaded into this container for execution. Subsequently, your code performs its tasks, and the Lambda finishes. Later on, when you invoke the AWS Lambda again, it can follow two paths: If the code hasn't changed or not much time has gone by, Lambda MAY reuse the previous container and its shared resources or the Lambda MAY create a new execution container from scratch. This behavior isn't something you can control; it's determined by the Lambda itself. With that said, you should avoid storing data in shared resources. If you run the Lambda and save the data into shared resources and, in the next Lambda execution, the lambda decides to create a new container, then you will lose all data that hasn't been flushed in the previous Lambda execution. You are trying to make the Lambda function operate in a stateful manner, whereas the lambda's intended behavior is to be completely stateless. In this scenario, my recommendation is to create the bulk indexer within the handler each time the Lambda is invoked. |
Outside of the described Lambda scenario on which I can't comment, having a Flush() on the bulk-indexer comes handy when writing test-code. It's required when trying port code that was written using https://github.com/olivere/elastic/blob/29ee98974cf1984dfecf53ef772d721fb97cb0b9/bulk_processor.go#L409 |
Is your feature request related to a problem?
I have a Lambda that consumes events from a Kinesis stream and posts them to an OpenSearch Cluster - I use
BulkIndexer
to send in a large number of documents more efficiently (i.e. via the/bulk
API).Currently, I build up most of my dependencies (e.g. DB connection pools, AWS Signer, etc) in the
main()
function of my Lambda, and then I pass these dependencies to a struct. I create a handler function which receives the struct containing all of these dependencies - this way, as long as my Lambda stays warm, the dependencies are readily available for the handler to use on a subsequent invocation. e.g.:I want to be able to pass the
BulkIndexer
on my dependency struct, but there's an issue. Your documentation for theBulkIndexer
says:The problem is that when you call
Close()
againstBulkIndexer
, you closequeue
(which is the channel thatAdd()
appends requests to).This creates a problem on subsequent invocations of the Lambda because if the channel is closed (by the call to
Close()
on a previous invocation), then callingAdd()
will result in trying to pass data to a nonexistent channel - which results in a panic.This means that I can't store the
BulkIndexer
in my dependency struct because the channel underpinning the whole thing may be nonexistent on subsequent invocations. This means that I have to instantiate theBulkIndexer
for each invocation of the Lambda (directly on the handler - even if the function is still warm).What solution would you like?
I would like to be able to call
Flush()
directly against theBulkIndexer
.The
Close()
function already callsflush()
against the workers, and this fires off whatever requests may be remaining in the queue (in addition to invoking the relevant callback functions).If we were able to call
BulkIndexer.Flush()
directly, we could clear out whatever requests may remain in thequeue
(without closing it), and then callreturn
on the Lambda afterwards - this would allow us to reuse theBulkIndexer
on subsequent invocations of the Lambda (without risk of closing the channel/causing a panic).What alternatives have you considered?
BulkIndexerConfig.FlushInterval
.a. The problem here is that we have to wait for the timer to hit 0 for the flush to occur - this means that we have to keep the Lambda running (which costs money and inflates our execution time metrics).
b. There's also no clean way to see the amount of time until the next flush.
c. You could set
BulkIndexerConfig.FlushInterval
to a low number - but if you're flushing/firing off requests every few seconds, you defeat the purpose of using the/bulk
API.BulkIndexerConfig.FlushBytes
a. The problem here is that we have to wait for the
BulkIndexer
to hit the threshold - which isn't always guaranteed to happen - e.g. if you've just flushed the data, but then youAdd()
one small request.b. You could set
BulkIndexerConfig.FlushBytes
to a low number - but if you're flushing/firing off requests after a small number ofAdd()
calls, you defeat the purpose of using the/bulk
API.BulkIndexer
in the handler - every time that the Lambda is calleda. This is what I do currently
b. This means that you can call
Close()
at the end of your handler function (to flush any outstanding requests in the queue)c. This significantly increases the runtime of my Lambda (on every invocation), which has a direct impact on running cost.
Do you have any additional context?
Setting up dependencies outside of a Lambda's function handler is an AWS best practice - see here:
The text was updated successfully, but these errors were encountered: