Skip to content

Commit

Permalink
[MINDEXER-188] Ensure that the producer thread doesn't indefinitely b…
Browse files Browse the repository at this point in the history
…lock on the queue (#314)

The consumer Threads can quit early when an exception occurs. In that case the consumer will usually block on `put` once the queue is full without a chance to recover.

This change clears the queue and shuts the executor service down as soon an exception is caught.

---

https://issues.apache.org/jira/browse/MINDEXER-188
  • Loading branch information
mbien authored May 8, 2023
1 parent 3a1f208 commit b947abc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ target/
*.iml
.idea/

# NetBeans
nbactions.xml
nb-configuration.xml

# Other
.svn/
bin/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;

import org.apache.lucene.document.Document;
Expand Down Expand Up @@ -182,9 +185,10 @@ private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context)
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);

ExecutorService executorService = Executors.newFixedThreadPool(threads);
ArrayList<Exception> errors = new ArrayList<>();
ArrayList<FSDirectory> siloDirectories = new ArrayList<>(threads);
ArrayList<IndexWriter> siloWriters = new ArrayList<>(threads);
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
List<FSDirectory> siloDirectories = new ArrayList<>(threads);
List<IndexWriter> siloWriters = new ArrayList<>(threads);
AtomicBoolean stopEarly = new AtomicBoolean(false);
LOGGER.debug("Creating {} silo writer threads...", threads);
for (int i = 0; i < threads; i++) {
final int silo = i;
Expand All @@ -201,8 +205,12 @@ private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context)
break;
}
addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
} catch (InterruptedException | IOException e) {
} catch (Throwable e) {
errors.add(e);
if (stopEarly.compareAndSet(false, true)) {
queue.clear(); // unblock producer
executorService.shutdownNow(); // unblock consumers
}
break;
}
}
Expand All @@ -215,7 +223,7 @@ private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context)
LOGGER.debug("Loading up documents into silos");
try {
Document doc;
while ((doc = readDocument()) != null) {
while (!stopEarly.get() && (doc = readDocument()) != null) {
queue.put(doc);
n++;
}
Expand All @@ -232,9 +240,15 @@ private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context)
}

if (!errors.isEmpty()) {
IOException exception = new IOException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
IOException exception = new IOException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
} else {
RuntimeException exception = new RuntimeException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
}
}

LOGGER.debug("Silos loaded...");
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ under the License.
<exclude>.git/**</exclude>
<exclude>.idea/**</exclude>
<exclude>**/*.iml</exclude>
<exclude>nbactions.xml</exclude>
<exclude>nb-configuration.xml</exclude>
<!-- exlude some test resources from rat analysis -->
<exclude>src/test/**/*.sha1</exclude>
<exclude>src/test/**/*.md5</exclude>
Expand Down

0 comments on commit b947abc

Please sign in to comment.