Skip to content

Commit

Permalink
reuse elasticsearch client and properly close it on error
Browse files Browse the repository at this point in the history
  • Loading branch information
lobeck committed Oct 8, 2020
1 parent 493cdb2 commit c1dfb88
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class ElasticLambdaForwarder implements RequestHandler<S3Event, Void>
private static boolean ELASTIC_DEBUG = false;
private static boolean USE_AWS_CREDENTIALS = false;

RestHighLevelClient es;

@Override
public Void handleRequest(S3Event event, Context context)
Expand All @@ -102,7 +103,9 @@ public Void handleRequest(S3Event event, Context context)
"Parameters: hostname [%s:%d] index [%s], username [%s], pipeline [%s], ssl/tls [%b]",
ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_INDEX, ELASTIC_USERNAME, ELASTIC_PIPELINE, ELASTIC_HTTPS));

RestHighLevelClient es = client(ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_USERNAME, ELASTIC_PASSWORD, ELASTIC_HTTPS);
if (es == null) {
es = client(ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_USERNAME, ELASTIC_PASSWORD, ELASTIC_HTTPS);
}

AmazonS3 s3Client = getS3Client();
BulkProcessor processor = processor(es, BULK_ACTIONS, BULK_CONCURRENCY);
Expand All @@ -114,11 +117,20 @@ public Void handleRequest(S3Event event, Context context)
logger.log("Finished processing; flushing any remaining logs...");
processor.flush();
processor.awaitClose(60L, TimeUnit.SECONDS);
logger.log("Elasticsearch processor shut down");
logger.log("Elasticsearch processing done");
}
}
catch (Exception e) {
logger.log(String.format("%s\n%s", e.getMessage(), trace(e)));
try {
if (es != null) {
es.close();
}
} catch (IOException ignored) {

} finally {
es = null;
}
}

return null;
Expand Down

0 comments on commit c1dfb88

Please sign in to comment.