Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threads lock scenario at BulkIngester // FnCondition with high concurrency setup #651

Open
codehustler opened this issue Aug 21, 2023 · 25 comments · Fixed by #870
Open

Threads lock scenario at BulkIngester // FnCondition with high concurrency setup #651

codehustler opened this issue Aug 21, 2023 · 25 comments · Fixed by #870
Labels
Category: Bug Something isn't working

Comments

@codehustler
Copy link

Java API client version

7.17.12

Java version

11

Elasticsearch Version

7.17.12

Problem description

Hi, I think I have found a bug with the BulkIngester, maybe an issue with the locks.

The problem is, that only certain dev machines and some servers show this issue. We run the 7.17.12 java client lib. I cannot 100% figure out what is going on, and it probably makes no sense to create a ticket for this without being able to reproduce it properly. I have attached a thread dump which shows several threads still waiting, I hope this helps.

More context:

We use the bulk ingester to index a file with ~12k documents (just one example file). it runs to 99%, then gets stuck, and because we have configured a 10sec flush interval on the BulkIngester, every 10 seconds we see a bulk context getting flushed with just a single document in it. This goes on for 3 to 4 minutes and every 10 seconds the same picture: one bulk request with a single add operation. A thread dump shows that some threads are waiting in BulkIngester.add, which is waiting inside the FnCondition.whenReadyIf(...) at the "awaitUninterruptibly" call. So it seems, one bulk request comes back with a single request in it, that triggers the addCondition.signalIfReady() call which then lets the next request through, but again with just one single request in it. this does not happen when debugging, this does not happen when adding a per document log message, thats why I think it is a race condition somewhere. If I change the addCondition.signalIfReady() to signalAllIfReady, it works, but I would really like to find out the actual root cause of this!

I have a 32 core CPU, we are collecting and preparing our index documents in parallel. When I limit the pool to 8 threads, then it also works just fine.

thread_dump.txt

@victorGS18
Copy link

Hi, I have the same problem with the version Java API client 8.12.1 and the Elasticsearch version 8.12.1.

One detail that I saw is that when printing the id of the bulk in the beforeBulk there were jumps when the id should be sequential.

@codehustler, Did you find any other solution?

@l-trotta
Copy link
Contributor

Hello, I'd like to try and reproduce this, I have used the BulkIngester recently with an 80K rows document and nothing of sort happened. Could you provide the code for the BulkIngester configuration? Thank you.

@victorGS18
Copy link

victorGS18 commented Apr 22, 2024

Hello,
We create the BulkIngester with the following code:
BulkIngester.of(b -> b.client(esClient) .globalSettings(gsb -> gsb.refresh(Refresh.False)) .maxOperations(120) .flushInterval(10, TimeUnit.SECONDS) .listener(bulkListener) .maxConcurrentRequests(8) .maxSize(new ByteSizeValue(1, MB).getBytes())

We have observed this when there is a high level of parallelism. With an instance c5.9x.large (36 cores) it happened to us very often, however we changed the instance to a c5.4xlarge (16 cores) and it has not happened to us again.

The bulk Listener that we use just log the errors and retry depending on the error.

@l-trotta
Copy link
Contributor

Thank you @victorGS18, we'll investigate this.

@l-trotta l-trotta added the Category: Bug Something isn't working label Apr 22, 2024
@nmaves
Copy link

nmaves commented Apr 22, 2024

I believe that we are running into this very same issue. We have multiple threads all sending updates to a single BulkIngester. They seems to hang in the same spot at FnCondition.whenReadyIf.

@l-trotta
Copy link
Contributor

@victorGS18 I tried to reproduce this again, starting from your configuration and then tweaking it, but I'm yet again failing to reproduce it. Could you try running it without the Listener and see if it gets stuck without it as well?

@nmaves the BulkIngester already uses a threadpool underneath, so I'd avoid accessing it from multiple threads.

@nathan-maves
Copy link

@l-trotta Thanks for the update. I would ask that you update the docs to mention that this class is NOT thread safe.

@victorGS18
Copy link

@l-trotta I'm trying to try it these days but my case is the same as @nmaves, I have several threads using the same BulkIngester.

I thought that this BulkIngester was thread safe like the old BulkProcessor was.

@l-trotta
Copy link
Contributor

l-trotta commented Jun 3, 2024

Hello again, sorry I didn't check the code thoroughly enough, the BulkIngester should indeed be thread safe. I'll perform more tests, sorry for the confusion.

@l-trotta
Copy link
Contributor

Final update: I've managed to reproduce the thread lock and find the issue. The problem lies with the fact that the Listener code can be executed by any of the BulkIngester threads. If the code performs many retries, or gets stuck for some reason, all of the BulkIngester threads will get stuck there at some point. The issue will be solved soon by having a thread pool execute the Listener code, and not just any thread. Thank you for your patience, this will be out in the next release after the PR gets merged.

@victorGS18
Copy link

Thank you so much! @l-trotta

@victorGS18
Copy link

Using version 8.14.1 which already has the fix, I still have the same problem @l-trotta .

@nmaves, Have you been able to test to see if it has been resolved in your case?

@l-trotta
Copy link
Contributor

@victorGS18 sorry for the confusion, the fix will be available in the next minor release, so 8.15.0, because we introduced breaking changes to the bulk ingester.

@victorGS18
Copy link

Perfect @l-trotta , I got confused when I saw the changes in the versions

@aliariff
Copy link

Hi @l-trotta,

I can confirm that this issue still persists in version 8.15.0. The root cause remains as the original author described, with threads getting stuck in the BulkIngester.add method.

I am able to reproduce this issue by having multiple threads call BulkIngester.add concurrently. All these threads end up waiting in the FnCondition class at the line condition.awaitUninterruptibly();.

The reason multiple threads are getting stuck is that when awaitUninterruptibly is called, the lock is released, allowing other threads to acquire the lock and also end up waiting on the same line. For reference, you can check this StackOverflow post.

Additionally, during the flush process, only one thread is woken up due to the signal() call via addCondition.signalIfReady(). As a result, the remaining threads continue to wait indefinitely.

To resolve this issue, the solution is to replace the signal() call with signalAll() to wake up all the waiting threads.

@l-trotta
Copy link
Contributor

Hello @aliariff, I have managed to reproduce the original issue (ingester getting stuck at 99%), by configuring the bulk ingester so that it has very low maxConcurrentRequests (how many requests it is allowed to handle concurrently) compared to the number of threads sending calling the add method.
Replacing signal() with signalAll() can solve the problem, but it's an expensive operation that we'd like to avoid, unless there isn't really any other option. We're currently trying to figure why signal() doesn't seem to wake up any threads, in the meantime could you give me more details on how the bulk ingester is configured in your case?
It could be that simply adjusting maxConcurrentRequests solves the problem, or maybe there are other ways to reproduce this that I'm not seeing. Thank you for your patience.

@l-trotta l-trotta reopened this Aug 23, 2024
@aliariff
Copy link

Hi @l-trotta,

Yes, that's correct. In our configuration, the maxConcurrentRequests is set to 1 to prevent overloading our ES instance, and changing this isn't an option for now. However, we do have multiple threads calling the add method.

@victorGS18
Copy link

@l-trotta I've start using the the last version of the client(8.15.1) and the same behavior persists. Threads keep blocking when there are more threads running than configured in the maxConcurrentRequests.

@l-trotta l-trotta reopened this Sep 23, 2024
@l-trotta
Copy link
Contributor

@victorGS18 at this point I think I'll need either a full reproducer project or detailed information on how you're running the BulkIngester, because I really cannot reproduce this and I'm suspecting configuration differences. The information needed (generalizing for anyone else who might have the same problem)

  • BulkIngester configuration
  • Scheduler configuration (if external)
  • Code for add operation (and threadpool configuration if used)
  • Listener configuration (if present)

Sorry this is taking so long, and thanks for the patience.

@l-trotta
Copy link
Contributor

@aliariff did 8.15.1 solve your case?

@aliariff
Copy link

Hi @l-trotta

Yes, the 8.15.1 version solved our problem. Thank you for your help.

@victorGS18
Copy link

Hi @l-trotta ,

BulkIngester configuration

return BulkIngester.of(b -> b.client(esClient)
                                     .globalSettings(gsb -> gsb.refresh(false))
                                     .maxOperations(120)
                                     .flushInterval(10, TimeUnit.SECONDS)
                                     .listener(bulkListener)
                                     .maxConcurrentRequests(8)
                                     .maxSize(1048576)
        )

Scheduler configuration
No defined
Code for add operation

@Override
    public void bulkStore(Event event) {
        newBulkOperation(eventEnricherHelper.setDataStream(event))
                .ifPresent(bulkOperation -> {
                    try {
                        bulkIngesterWrapper.add(bulkOperation);
                    } catch (Exception e) {
                        log.error("Error processing bulk request", e);
                    }
                });
    }
private Optional<BulkOperation> newBulkOperation(Event event) {
        try {
            String sourceEvent = eventTransformer.transform(event);
            return Optional.of(newBulkOperation(event, sourceEvent));
        } catch (BidderJsonException e) {
            log.error("Error on event json serialization: {}", e.getMessage(), e);
            return Optional.empty();
        }
    }
public BulkOperation getBulkOperation(Event event, String indexField, String sourceEvent) {
        String id = event.getId();
        if (event.isDataStream()) {
            if (event.isUpdate()) {
                return BulkOperation.of(op -> op.update(ir -> ir.index(indexField)
                                                                .id(id)
                                                                .action(ac -> ac.doc(new RawValue(sourceEvent))
                                                                                .docAsUpsert(true))
                                                                .routing(NO_INDEX_REQUEST_ROUTING_FIELD)));
            }
            return BulkOperation.of(op -> op.create(ir -> ir.index(indexField)
                                                            .id(id)
                                                            .document(new RawValue(sourceEvent))
                                                            .routing(NO_INDEX_REQUEST_ROUTING_FIELD)));
        }
        return BulkOperation.of(op -> op.index(ir -> ir.index(indexField)
                                                       .id(id)
                                                       .document(new RawValue(sourceEvent))
                                                       .routing(NO_INDEX_REQUEST_ROUTING_FIELD)));
    }

threadpool configuration
By default

Listener configuration

class BulkListenerWrapper implements BulkListener<Void> {

        private static final String METRICS_PREFIX = "BulkIngesterNewES";

        private Meter sentBulks;
        private Meter failedBulks;
        private Histogram bulkSize;
        private DateProvider dateProvider;

        @Getter
        private DateTime successfulBulkTime;

        public BulkListenerWrapper(DateProvider dateProvider, MetricRegistry metricRegistry) {
            this.dateProvider = dateProvider;
            this.sentBulks = MastermindMetricsUtils.registerMeter(metricRegistry, METRICS_PREFIX, "bulks.sent");
            this.bulkSize = MastermindMetricsUtils.registerHistogram(metricRegistry, METRICS_PREFIX, "bulks.size");
            this.failedBulks = MastermindMetricsUtils.registerMeter(metricRegistry, METRICS_PREFIX, "bulks.failed");
        }

        @Override
        public void beforeBulk(long executionId, BulkRequest request, List list) {
            sentBulks.mark();
            bulkSize.update(request.operations().size());
            successfulBulkTime = dateProvider.currentTime();
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, List list, BulkResponse response) {
            removeRetriesOfSuccessfulRequests(response);
            if (!response.errors()) {
                if (isMaxEventsWithRetriesReached() && eventsWithMaxRetries() < minRequestsWithMaxRetries) {
                    maxEventsWithRetriesReached.set(false);
                    notifyMaxEventsWithRetriesErrorStatus();
                }
                return;
            }
            try {
                log.warn("Cannot perform bulk insert: {}", response.items()
                                                                   .stream()
                                                                   .filter(item -> !this.isSuccessfulItem(item))
                                                                   .map(item -> item.error().reason())
                                                                   .collect(Collectors.joining(" \n ")));
            } catch (Exception e) {
                log.error("Building warn log of error in bulk insert", e);
            }
            failedBulks.mark();
            enqueueFailedRequests(request, response);
        }

        private void removeRetriesOfSuccessfulRequests(BulkResponse response) {
            response.items()
                    .stream()
                    .filter(this::isSuccessfulItem)
                    .forEach(item -> retriesByRequest.invalidate(item.id()));
        }

        private boolean isSuccessfulItem(BulkResponseItem item) {
            return item.error() == null || item.status() == 409; //409 means duplicated document. We ignore it
        }

        private void enqueueFailedRequests(BulkRequest request, BulkResponse response) {
            Set<String> failedRequestIds = response.items()
                                                   .stream()
                                                   .filter(item -> !this.isSuccessfulItem(item))
                                                   .map(BulkResponseItem::id)
                                                   .collect(Collectors.toSet());

            asyncUtils.executeAsync(() -> request.operations()
                                                 .stream()
                                                 .filter(op -> op.isIndex() ? failedRequestIds.contains(op.index().id()) :
                                                               failedRequestIds.contains(op.create().id()))
                                                 .forEach(BulkIngesterWrapper.this::addFailedRequest));
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, List list, Throwable failure) {
            log.error("Cannot perform bulk insert", failure);
            failedBulks.mark();
            asyncUtils.executeAsync(() -> request.operations().forEach(BulkIngesterWrapper.this::addFailedRequest));
        }
    }

private void addFailedRequest(BulkOperation request) {
        String id = request.isIndex() ? request.index().id() : request.create().id();
        int retriesOfEvent = Optional.ofNullable(retriesByRequest.getIfPresent(id)).orElse(0);
        retriesByRequest.put(id, ++retriesOfEvent);
        if (retriesOfEvent > maxRetries) {
            logRequest(request, retriesOfEvent);
            if (!isMaxEventsWithRetriesReached() && eventsWithMaxRetries() >= maxRequestsWithMaxRetries) {
                maxEventsWithRetriesReached.set(true);
                log.error("Bulk Processor Status to error because a lot of events with retries");
                notifyMaxEventsWithRetriesErrorStatus();
            }
        }
        bulkIngester.add(request);
    }

I think this is all the information you asked for.
An important fact is that depending on the machine we use it crashes or not. If we use a C5.4xlarge it does not happen but when we upgrade to a C5.9xlarge it happens shortly after starting.

Thanks!

@l-trotta
Copy link
Contributor

l-trotta commented Oct 2, 2024

@victorGS18 which library is asyncUtils from?

@victorGS18
Copy link

victorGS18 commented Oct 2, 2024

@l-trotta It is a internal utility class, Sorry, I thought I had added all the code for external classes.

    public CompletableFuture<Void> executeAsync(Runnable task) {
        return executeAsync(task, Optional.empty());
    }

    private static CompletableFuture<Void> executeAsync(Runnable task, Optional<String> errorMsg) {
        return CompletableFuture.runAsync(task).exceptionally(e -> logError(errorMsg, e));
    }
    private static Void logError(Optional<String> errorMsg, Throwable e) {
        log.error(errorMsg.orElse("Cannot execute async task"), e);
        return null;
    }

@l-trotta
Copy link
Contributor

l-trotta commented Oct 3, 2024

@victorGS18 I set up a project using your code with high concurrency adding operations to a BulkIngester instance configured in exactly the same way, but I wasn't able to reproduce the thread lock, and unfortunately there's not much I can do if I cannot reproduce the issue locally. The way forward is, if you're able to set up a reproducer which can reliably reproduce the issue then we can keep working on it, otherwise it could be a machine/OS/JVM specific problem which we have no control over.

A couple of things that could help, that I noticed while working with your code:
If the application works fine if deployed a smaller machine, I would try at least doubling maxConcurrentRequests and maxOperations when deployed on the larger machine, any Elasticsearch instance should not be overburdened by that.
Also from 8.15.0 (with a bugfix out in 8.15.2) the Listener already executes afterBulk() asynchronously, using an internal threadpool if an external one is not provided, so there's no need to defer again the code in afterBulk().

Thank you for all the information provided so far, I'll keep this open in case there are other updates or other confirmed cases from other users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Category: Bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants