diff --git a/aws/src/main/java/com/cloudflare/elastic/ElasticLambdaForwarder.java b/aws/src/main/java/com/cloudflare/elastic/ElasticLambdaForwarder.java index 3d7fba1..029449e 100644 --- a/aws/src/main/java/com/cloudflare/elastic/ElasticLambdaForwarder.java +++ b/aws/src/main/java/com/cloudflare/elastic/ElasticLambdaForwarder.java @@ -89,6 +89,7 @@ public class ElasticLambdaForwarder implements RequestHandler private static boolean ELASTIC_DEBUG = false; private static boolean USE_AWS_CREDENTIALS = false; + RestHighLevelClient es; @Override public Void handleRequest(S3Event event, Context context) @@ -102,7 +103,9 @@ public Void handleRequest(S3Event event, Context context) "Parameters: hostname [%s:%d] index [%s], username [%s], pipeline [%s], ssl/tls [%b]", ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_INDEX, ELASTIC_USERNAME, ELASTIC_PIPELINE, ELASTIC_HTTPS)); - RestHighLevelClient es = client(ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_USERNAME, ELASTIC_PASSWORD, ELASTIC_HTTPS); + if (es == null) { + es = client(ELASTIC_HOSTNAME, ELASTIC_PORT, ELASTIC_USERNAME, ELASTIC_PASSWORD, ELASTIC_HTTPS); + } AmazonS3 s3Client = getS3Client(); BulkProcessor processor = processor(es, BULK_ACTIONS, BULK_CONCURRENCY); @@ -114,11 +117,20 @@ public Void handleRequest(S3Event event, Context context) logger.log("Finished processing; flushing any remaining logs..."); processor.flush(); processor.awaitClose(60L, TimeUnit.SECONDS); - logger.log("Elasticsearch processor shut down"); + logger.log("Elasticsearch processing done"); } } catch (Exception e) { logger.log(String.format("%s\n%s", e.getMessage(), trace(e))); + try { + if (es != null) { + es.close(); + } + } catch (IOException ignored) { + + } finally { + es = null; + } } return null;