diff --git a/README.adoc b/README.adoc
index fe7ca705..007f3812 100644
--- a/README.adoc
+++ b/README.adoc
@@ -77,12 +77,12 @@ By default, in dev mode, any parsing/indexing warnings and errors are going to b
But in case you want to get index errors reported to a GitHub issue, next properties should be added:
[source,properties]
----
-_DEV_INDEXING_ERROR_REPORTING_TYPE=github-issue
+_DEV_INDEXING_REPORTING_TYPE=github-issue
# see about tokens https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens
-INDEXING_ERROR_REPORTING_GITHUB_TOKEN={your-generated-github-token}
-INDEXING_ERROR_REPORTING_GITHUB_ISSUE_REPOSITORY={your-github-user}/search.quarkus.io
-INDEXING_ERROR_REPORTING_GITHUB_ISSUE_ID={github-issue-id-in-your-repository}
-INDEXING_ERROR_REPORTING_GITHUB_WARNING_REPEAT_DELAY=10m
+INDEXING_REPORTING_GITHUB_TOKEN={your-generated-github-token}
+INDEXING_REPORTING_GITHUB_ISSUE_REPOSITORY={your-github-user}/search.quarkus.io
+INDEXING_REPORTING_GITHUB_ISSUE_ID={github-issue-id-in-your-repository}
+INDEXING_REPORTING_GITHUB_WARNING_REPEAT_DELAY=10m
----
[[testing]]
@@ -182,7 +182,7 @@ podman container run -it --rm --name search.quarkus.io --pod search.quarkus.io \
-v $REPOS_DIR/es.quarkus.io:/mnt/es.quarkus.io:ro,z \
-v $REPOS_DIR/ja.quarkus.io:/mnt/ja.quarkus.io:ro,z \
-v $REPOS_DIR/pt.quarkus.io:/mnt/pt.quarkus.io:ro,z \
- -e INDEXING_ERROR_REPORTING_TYPE=log \
+ -e INDEXING_REPORTING_TYPE=log \
-e GITHUB_OAUTH=ignored \
-e GITHUB_STATUS_ISSUE_ID=1 \
-e QUARKUSIO_GIT_URI=file:/mnt/quarkus.io \
@@ -240,7 +240,7 @@ you will need to set up a few things manually:
In particular:
* `GITHUB_STATUS_ISSUE_ID`: The number of an issue on quarkusio/search.quarkus.io
where indexing status should be reported.
- See `indexing.error-reporting.github` configuration properties for more details.
+ See `indexing.reporting.github` configuration properties for more details.
`search-quarkus-io-secret`::
Secret environment variables for the application.
+
@@ -249,7 +249,7 @@ you will need to set up a few things manually:
In particular:
* `GITHUB_OAUTH`: a GitHub token that allows commenting/reopening/closing a GitHub issue
on quarkusio/search.quarkus.io.
- See `indexing.error-reporting.github` configuration properties for more details.
+ See `indexing.reporting.github` configuration properties for more details.
`search-backend-config`::
Environment variables for the OpenSearch instances.
+
diff --git a/pom.xml b/pom.xml
index 47a64fd9..bf7c132e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,11 @@
1.0.0
test
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
diff --git a/src/main/java/io/quarkus/search/app/fetching/FetchingService.java b/src/main/java/io/quarkus/search/app/fetching/FetchingService.java
index b2ca59cc..1273b1b9 100644
--- a/src/main/java/io/quarkus/search/app/fetching/FetchingService.java
+++ b/src/main/java/io/quarkus/search/app/fetching/FetchingService.java
@@ -17,7 +17,7 @@
import jakarta.inject.Inject;
import io.quarkus.search.app.entity.Language;
-import io.quarkus.search.app.indexing.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
import io.quarkus.search.app.quarkusio.QuarkusIO;
import io.quarkus.search.app.quarkusio.QuarkusIOConfig;
import io.quarkus.search.app.util.CloseableDirectory;
diff --git a/src/main/java/io/quarkus/search/app/indexing/FailFastMassIndexingFailureHandler.java b/src/main/java/io/quarkus/search/app/indexing/FailFastMassIndexingFailureHandler.java
new file mode 100644
index 00000000..801a4ac1
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/FailFastMassIndexingFailureHandler.java
@@ -0,0 +1,38 @@
+package io.quarkus.search.app.indexing;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.quarkus.logging.Log;
+
+import org.hibernate.search.engine.reporting.impl.LogFailureHandler;
+import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureContext;
+import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
+import org.hibernate.search.mapper.pojo.massindexing.impl.PojoMassIndexingDelegatingFailureHandler;
+
+class FailFastMassIndexingFailureHandler implements MassIndexingFailureHandler {
+ private final MassIndexingFailureHandler delegate = new PojoMassIndexingDelegatingFailureHandler(new LogFailureHandler());
+
+ volatile boolean failed = false;
+ volatile CompletableFuture> future;
+
+ @Override
+ public void handle(MassIndexingFailureContext context) {
+ delegate.handle(context);
+ failed = true;
+ if (future != null) {
+ abort();
+ }
+ }
+
+ public void init(CompletableFuture> future) {
+ this.future = future;
+ if (failed) {
+ abort();
+ }
+ }
+
+ private void abort() {
+ Log.error("Aborting mass indexing to fail fast.");
+ future.cancel(true);
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/FailureCollector.java b/src/main/java/io/quarkus/search/app/indexing/FailureCollector.java
deleted file mode 100644
index e5dcad7e..00000000
--- a/src/main/java/io/quarkus/search/app/indexing/FailureCollector.java
+++ /dev/null
@@ -1,284 +0,0 @@
-package io.quarkus.search.app.indexing;
-
-import static io.quarkus.search.app.util.Streams.toStream;
-import static io.quarkus.search.app.util.UncheckedIOFunction.uncheckedIO;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.sql.Date;
-import java.time.Clock;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import io.quarkus.search.app.util.Streams;
-
-import io.quarkus.logging.Log;
-
-import org.kohsuke.github.GHIssue;
-import org.kohsuke.github.GHIssueComment;
-import org.kohsuke.github.GHIssueState;
-import org.kohsuke.github.GHRepository;
-import org.kohsuke.github.GitHub;
-import org.kohsuke.github.GitHubBuilder;
-
-public class FailureCollector implements Closeable {
-
- public enum Level {
- CRITICAL,
- WARNING;
- }
-
- public enum Stage {
- PARSING,
- TRANSLATION,
- INDEXING;
- }
-
- /**
- * @param level Whether the reported failure should lead to reporting the indexing as incomplete/failed or not.
- * @param stage Where the failure happened.
- * @param details Failure details.
- * @param exception An exception that has caused the failure.
- */
- public record Failure(Level level, Stage stage, String details, Exception exception) {
- static Comparator COMPARATOR = Comparator.comparing(Failure::level)
- .thenComparing(Failure::stage)
- .thenComparing(Failure::details)
- // Not perfect, but then how likely it is to get
- // two failures with everything identical except the exception?
- .thenComparing(f -> System.identityHashCode(f.exception()));
- }
-
- private final EnumMap> failures = new EnumMap<>(Level.class);
- private final Consumer>> reporter;
-
- public FailureCollector() {
- this(IndexingConfig.GitErrorReporting.Type.LOG, Clock.systemUTC(), Optional.empty());
- }
-
- public FailureCollector(IndexingConfig.GitErrorReporting config) {
- this(config.type(), Clock.systemUTC(), config.github());
- }
-
- private FailureCollector(IndexingConfig.GitErrorReporting.Type type, Clock clock,
- Optional githubOptional) {
- for (Level value : Level.values()) {
- failures.put(value, Collections.synchronizedList(new ArrayList<>()));
- }
- switch (type) {
- case LOG -> reporter = FailureCollector::logReporter;
- case GITHUB_ISSUE -> {
- IndexingConfig.GitErrorReporting.GithubReporter github = githubOptional.orElseThrow(
- () -> new IllegalArgumentException(
- "GitHub error reporting requires both GitHub repository and issue id to be specified in the properties."));
- reporter = new GithubFailureReporter(clock, github)::report;
- }
- default -> throw new AssertionError("Unknown reporter type: " + type);
- }
- }
-
- public void warning(Stage stage, String details) {
- warning(stage, details, null);
- }
-
- public void warning(Stage stage, String details, Exception exception) {
- Log.warn(details, exception);
- failures.get(Level.WARNING).add(new Failure(Level.WARNING, stage, details, exception));
- }
-
- public void critical(Stage stage, String details, Exception exception) {
- Log.error(details, exception);
- failures.get(Level.CRITICAL).add(new Failure(Level.CRITICAL, stage, details, exception));
- }
-
- @Override
- public void close() {
- reporter.accept(failures);
- }
-
- private static void logReporter(EnumMap> failures) {
- // failures are an enum map that we preinitialize, hence we check if there's anything in the lists:
- if (failures.isEmpty() || failures.values().stream().allMatch(List::isEmpty)) {
- return;
- }
- StringBuilder sb = new StringBuilder();
- toMarkdown(sb, failures, false);
- Log.warn(sb);
- }
-
- static class GithubFailureReporter {
-
- private static final String STATUS_CRITICAL = "Critical";
- private static final String STATUS_WARNING = "Warning";
- private static final String STATUS_SUCCESS = "Success";
- private static final String STATUS_REPORT_HEADER = "## search.quarkus.io indexing status: ";
- private static final String TITLE_UPDATED_AND_STATUS_FORMAT = ": %s (updated %s)";
- private static final Pattern TITLE_UPDATED_AND_STATUS_PATTERN = Pattern
- .compile("(:\s*([^() ]+) )?\s*\\(updated [^)]+\\)");
- private static final DateTimeFormatter UPDATED_DATE_FORMAT = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ssZZZZZ",
- Locale.ROOT);
- private static final int GITHUB_MAX_COMMENT_LENGTH = 65536;
-
- private final Clock clock;
- private final IndexingConfig.GitErrorReporting.GithubReporter config;
-
- private GithubFailureReporter(Clock clock, IndexingConfig.GitErrorReporting.GithubReporter config) {
- this.clock = clock;
- this.config = config;
- }
-
- void report(Map> failures) {
- Log.infof("Reporting indexing status to GitHub.");
- try {
- GitHub github = new GitHubBuilder().withOAuthToken(config.token()).build();
- GHRepository repository = github.getRepository(config.issue().repository());
- GHIssue issue = repository.getIssue(config.issue().id());
-
- String status = indexingResultStatus(failures);
-
- // add comments if needed:
- if (!STATUS_SUCCESS.equals(status)) {
- StringBuilder newMessageBuilder = new StringBuilder(STATUS_REPORT_HEADER)
- .append(status).append('\n');
-
- toMarkdown(newMessageBuilder, failures, true);
- String newMessage = newMessageBuilder.toString();
- if (newMessage.length() > GITHUB_MAX_COMMENT_LENGTH) {
- newMessage = ("### Message truncated as it was too long\n" + newMessage).substring(0,
- GITHUB_MAX_COMMENT_LENGTH);
- }
-
- if (STATUS_WARNING.equals(status)) {
- var lastRecentCommentByMe = getStatusCommentsSince(issue,
- clock.instant().minus(config.warningRepeatDelay()))
- .reduce(Streams.last());
- // For warnings, only comment if we didn't comment the same thing recently.
- if (lastRecentCommentByMe.isPresent()
- && lastRecentCommentByMe.get().getBody().contentEquals(newMessage)) {
- Log.infof("Skipping the issue comment because the same message was sent recently.");
- } else {
- issue.comment(newMessage);
- }
- } else {
- // For errors, always comment.
- issue.comment(newMessage);
- }
- }
-
- // Update last indexing date:
- issue.setTitle(insertStatusAndUpdateDate(clock, issue.getTitle(), status));
-
- // handle issue state (open/close):
- // Only reopen/keep opened an issue if we have critical things to report.
- // Otherwise, let's limit it to a comment only, and close an issue if needed.
- if (STATUS_CRITICAL.equals(status) && !GHIssueState.OPEN.equals(issue.getState())) {
- Log.infof("Opening GitHub issue due to critical errors.");
- issue.reopen();
- }
- if (!STATUS_CRITICAL.equals(status) && GHIssueState.OPEN.equals(issue.getState())) {
- Log.infof("Closing GitHub issue as indexing succeeded.");
- issue.close();
- }
- } catch (IOException | RuntimeException e) {
- throw new IllegalStateException("Unable to report failures to GitHub: " + e.getMessage(), e);
- }
- }
-
- static String insertStatusAndUpdateDate(Clock clock, String title, String status) {
- String toInsert = TITLE_UPDATED_AND_STATUS_FORMAT.formatted(status,
- UPDATED_DATE_FORMAT.format(clock.instant().atOffset(ZoneOffset.UTC)));
- String result = TITLE_UPDATED_AND_STATUS_PATTERN.matcher(title).replaceAll(toInsert);
- if (result.equals(title)) {
- // The title didn't contain any mention of the status and last update; add it.
- result = result + toInsert;
- }
- return result;
- }
-
- private Stream getStatusCommentsSince(GHIssue issue, Instant since) {
- return toStream(issue.queryComments().since(Date.from(since)).list())
- .filter(uncheckedIO((GHIssueComment comment) -> comment.getBody().startsWith(STATUS_REPORT_HEADER))::apply);
- }
-
- private static String indexingResultStatus(Map> failures) {
- if (failures.get(Level.CRITICAL).isEmpty()) {
- if (failures.get(Level.WARNING).isEmpty()) {
- return STATUS_SUCCESS;
- } else {
- return STATUS_WARNING;
- }
- } else {
- return STATUS_CRITICAL;
- }
- }
- }
-
- private static void toMarkdown(StringBuilder sb, Map> failures, boolean includeException) {
- for (Map.Entry> entry : failures.entrySet()) {
- toMarkdown(sb, entry.getValue(), entry.getKey(), includeException);
- }
- }
-
- private static void toMarkdown(StringBuilder sb, List failures, Level level, boolean includeException) {
- if (failures.isEmpty()) {
- return;
- }
- sb.append("\n### ").append(level).append("\n");
- Map> map = failures.stream().collect(
- // Sort failures so that two runs with the same failures will produce the same report
- // This is critical when we try to limit the frequency of identical reports (see GH reporter)
- Collectors.groupingBy(Failure::stage, Collectors.toCollection(() -> new TreeSet<>(Failure.COMPARATOR))));
- for (Stage stage : Stage.values()) {
- Set set = map.getOrDefault(stage, Set.of());
- if (!set.isEmpty()) {
- sb.append("\n");
- sb.append(" Issues with ").append(stage).append("
:
\n\n");
- for (Failure failure : set) {
- sb.append(" * ").append(failure.details()).append('\n');
- if (includeException) {
- formatException(sb, failure.exception());
- }
- }
- sb.append(" \n");
- }
- }
- }
-
- private static void formatException(StringBuilder sb, Exception exception) {
- if (exception == null) {
- return;
- }
-
- sb.append("\n \n")
- .append(" ")
- .append("Exception details: ").append(exception.getClass().getName())
- .append("
\n\n ```java\n");
-
- try (StringWriter writer = new StringWriter();
- PrintWriter printWriter = new PrintWriter(writer)) {
- exception.printStackTrace(printWriter);
- sb.append(writer.toString().replaceAll("(?m)^", " "));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- sb.append("\n ```\n \n\n");
- }
-}
diff --git a/src/main/java/io/quarkus/search/app/indexing/IndexingConfig.java b/src/main/java/io/quarkus/search/app/indexing/IndexingConfig.java
index 0ed28024..13430bb8 100644
--- a/src/main/java/io/quarkus/search/app/indexing/IndexingConfig.java
+++ b/src/main/java/io/quarkus/search/app/indexing/IndexingConfig.java
@@ -1,9 +1,11 @@
package io.quarkus.search.app.indexing;
import java.time.Duration;
-import java.util.Optional;
import java.util.OptionalInt;
+import io.quarkus.search.app.indexing.reporting.ReportingConfig;
+import io.quarkus.search.app.indexing.state.RetryConfig;
+
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
@@ -20,7 +22,9 @@ public interface IndexingConfig {
@WithDefault("30s")
Duration timeout();
- GitErrorReporting errorReporting();
+ ReportingConfig reporting();
+
+ RetryConfig retry();
interface OnStartup {
@WithDefault("always")
@@ -40,32 +44,4 @@ interface Scheduled {
String cron();
}
- interface GitErrorReporting {
- @WithDefault("log")
- Type type();
-
- Optional github();
-
- interface GithubReporter {
- Issue issue();
-
- String token();
-
- /**
- * @return How often to report status on GitHub when the last report was identical and contained only warnings.
- */
- Duration warningRepeatDelay();
-
- interface Issue {
- String repository();
-
- int id();
- }
- }
-
- enum Type {
- LOG,
- GITHUB_ISSUE;
- }
- }
}
diff --git a/src/main/java/io/quarkus/search/app/indexing/IndexingService.java b/src/main/java/io/quarkus/search/app/indexing/IndexingService.java
index a29575ce..18a45f1c 100644
--- a/src/main/java/io/quarkus/search/app/indexing/IndexingService.java
+++ b/src/main/java/io/quarkus/search/app/indexing/IndexingService.java
@@ -1,11 +1,18 @@
package io.quarkus.search.app.indexing;
+import static io.quarkus.search.app.util.MutinyUtils.runOnWorkerPool;
+import static io.quarkus.search.app.util.MutinyUtils.schedule;
import static io.quarkus.search.app.util.MutinyUtils.waitForeverFor;
import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
@@ -16,6 +23,10 @@
import io.quarkus.search.app.fetching.FetchingService;
import io.quarkus.search.app.hibernate.QuarkusIOLoadingContext;
import io.quarkus.search.app.hibernate.StreamMassIndexingLoggingMonitor;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.StatusReporter;
+import io.quarkus.search.app.indexing.state.IndexingAlreadyInProgressException;
+import io.quarkus.search.app.indexing.state.IndexingState;
import io.quarkus.search.app.quarkusio.QuarkusIO;
import io.quarkus.search.app.util.ExceptionUtils;
@@ -30,8 +41,7 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
-import io.smallrye.mutiny.Uni;
-import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.subscription.Cancellable;
@ApplicationScoped
public class IndexingService {
@@ -50,12 +60,18 @@ public class IndexingService {
@Inject
ReferenceService referenceService;
- private final AtomicBoolean reindexingInProgress = new AtomicBoolean();
+ private IndexingState state;
+
+ @PostConstruct
+ void init() {
+ state = new IndexingState(StatusReporter.create(indexingConfig.reporting(), Clock.systemUTC()),
+ indexingConfig.retry(), this::scheduleIndexing);
+ }
void registerManagementRoutes(@Observes ManagementInterface mi) {
mi.router().get(REINDEX_ENDPOINT_PATH)
.blockingHandler(rc -> {
- reindex();
+ reindex(false);
rc.end("Success");
});
}
@@ -76,30 +92,28 @@ void indexOnStartup(@Observes StartupEvent ev) {
() -> Log.infof("Reindexing on startup: search backend is not reachable yet, waiting..."))
.chain(() -> waitForeverFor(this::isSearchBackendReady, waitInterval,
() -> Log.infof("Reindexing on startup: search backend is not ready yet, waiting...")))
- .chain(() -> Uni.createFrom()
- .item(() -> {
- if (IndexingConfig.OnStartup.When.INDEXES_EMPTY.equals(indexingConfig.onStartup().when())) {
- try (var session = searchMapping.createSession()) {
- long documentCount = session.search(Object.class)
- .where(f -> f.matchAll())
- .fetchTotalHitCount();
- if (documentCount > 0L) {
- Log.infof("Not reindexing on startup:"
- + " index are present, reachable, and contain %s documents."
- + " Call endpoint '%s' to reindex explicitly.",
- documentCount, REINDEX_ENDPOINT_PATH);
- return null;
- }
- Log.infof("Reindexing on startup: indexes are empty.");
- } catch (RuntimeException e) {
- Log.infof(
- e, "Reindexing on startup: could not determine the content of indexes");
- }
+ .chain(() -> runOnWorkerPool(() -> {
+ if (IndexingConfig.OnStartup.When.INDEXES_EMPTY.equals(indexingConfig.onStartup().when())) {
+ try (var session = searchMapping.createSession()) {
+ long documentCount = session.search(Object.class)
+ .where(f -> f.matchAll())
+ .fetchTotalHitCount();
+ if (documentCount > 0L) {
+ Log.infof("Not reindexing on startup:"
+ + " index are present, reachable, and contain %s documents."
+ + " Call endpoint '%s' to reindex explicitly.",
+ documentCount, REINDEX_ENDPOINT_PATH);
+ return null;
}
- reindex();
- return null;
- })
- .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()))
+ Log.infof("Reindexing on startup: indexes are empty.");
+ } catch (RuntimeException e) {
+ Log.infof(
+ e, "Reindexing on startup: could not determine the content of indexes");
+ }
+ }
+ reindex(true);
+ return null;
+ }))
.subscribe().with(
// We don't care about the items, we just want this to run.
ignored -> {
@@ -111,9 +125,9 @@ void indexOnStartup(@Observes StartupEvent ev) {
void indexOnTime() {
try {
Log.infof("Scheduled reindexing starting...");
- reindex();
+ reindex(true);
Log.infof("Scheduled reindexing finished.");
- } catch (ReindexingAlreadyInProgressException e) {
+ } catch (IndexingAlreadyInProgressException e) {
Log.infof("Indexing was already started by some other process.");
} catch (RuntimeException e) {
Log.errorf(e, "Failed to start scheduled reindexing: %s", e.getMessage());
@@ -145,7 +159,7 @@ private boolean isSearchBackendReady() {
@SuppressWarnings("BusyWait")
@PreDestroy
protected void waitForReindexingToFinish() throws InterruptedException {
- if (!reindexingInProgress.get()) {
+ if (!state.isInProgress()) {
return;
}
@@ -154,39 +168,39 @@ protected void waitForReindexingToFinish() throws InterruptedException {
do {
Log.info("Shutdown requested, but indexing is in progress, waiting...");
Thread.sleep(5000);
- } while (reindexingInProgress.get() && Instant.now().isBefore(until));
- if (reindexingInProgress.get()) {
+ } while (state.isInProgress() && Instant.now().isBefore(until));
+ if (state.isInProgress()) {
throw new IllegalStateException("Shutdown requested, aborting indexing which took more than " + timeout);
}
}
+ private Cancellable scheduleIndexing(Duration delay) {
+ return schedule(delay, () -> runOnWorkerPool(() -> {
+ reindex(true);
+ return null;
+ }))
+ .subscribe().with(
+ // We don't care about the items, we just want this to run.
+ ignored -> {
+ },
+ t -> Log.errorf(t, "Reindexing on startup failed: %s", t.getMessage()));
+ }
+
@ActivateRequestContext
- protected void reindex() {
- // Reindexing requires exclusive access to the DB/indexes
- if (!reindexingInProgress.compareAndSet(false, true)) {
- throw new ReindexingAlreadyInProgressException();
- }
- try (FailureCollector failureCollector = new FailureCollector(indexingConfig.errorReporting())) {
+ protected void reindex(boolean allowRetry) {
+ try (IndexingState.Attempt attempt = state.tryStart(allowRetry)) {
try {
- createIndexes();
- indexAll(failureCollector);
+ createIndexesIfMissing();
+ indexAll(attempt);
} catch (RuntimeException e) {
- failureCollector.critical(FailureCollector.Stage.INDEXING, "Indexing failed: " + e.getMessage(), e);
+ attempt.critical(FailureCollector.Stage.INDEXING, "Indexing failed: " + e.getMessage(), e);
// Re-throw even though we've reported the failure, for the benefit of callers/logs
throw e;
}
- } finally {
- reindexingInProgress.set(false);
}
}
- private static class ReindexingAlreadyInProgressException extends RuntimeException {
- ReindexingAlreadyInProgressException() {
- super("Reindexing is already in progress and cannot be started at this moment");
- }
- }
-
- private void createIndexes() {
+ private void createIndexesIfMissing() {
try {
Log.info("Creating missing indexes");
searchMapping.scope(Object.class).schemaManager().createIfMissing();
@@ -220,14 +234,25 @@ private void indexAll(FailureCollector failureCollector) {
try (Rollover rollover = Rollover.start(searchMapping)) {
try (QuarkusIO quarkusIO = fetchingService.fetchQuarkusIo(failureCollector)) {
Log.info("Indexing quarkus.io...");
- searchMapping.scope(Object.class).massIndexer()
+ var failFastFailureHandler = new FailFastMassIndexingFailureHandler();
+ var future = searchMapping.scope(Object.class).massIndexer()
// no point in cleaning the data because of the rollover ^
.purgeAllOnStart(false)
.batchSizeToLoadObjects(indexingConfig.batchSize())
.threadsToLoadObjects(indexingConfig.parallelism().orElse(6))
.context(QuarkusIOLoadingContext.class, QuarkusIOLoadingContext.of(quarkusIO))
.monitor(new StreamMassIndexingLoggingMonitor())
- .startAndWait();
+ .failureHandler(failFastFailureHandler)
+ .start()
+ .toCompletableFuture();
+ failFastFailureHandler.init(future);
+ try {
+ future.get(indexingConfig.timeout().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ } catch (ExecutionException e) {
+ e.getCause();
+ }
}
rollover.commit();
@@ -240,4 +265,5 @@ private void indexAll(FailureCollector failureCollector) {
throw ExceptionUtils.toRuntimeException(e);
}
}
+
}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/Failure.java b/src/main/java/io/quarkus/search/app/indexing/reporting/Failure.java
new file mode 100644
index 00000000..42ca1885
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/Failure.java
@@ -0,0 +1,20 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import java.util.Comparator;
+
+/**
+ * @param level Whether the reported failure should lead to reporting the indexing as incomplete/failed or not.
+ * @param stage Where the failure happened.
+ * @param details Failure details.
+ * @param exception An exception that has caused the failure.
+ */
+public record Failure(FailureCollector.Level level, FailureCollector.Stage stage, String details, Exception exception) {
+
+ static Comparator COMPARATOR = Comparator.comparing(Failure::level)
+ .thenComparing(Failure::stage)
+ .thenComparing(Failure::details)
+ // Not perfect, but then how likely it is to get
+ // two failures with everything identical except the exception?
+ .thenComparing(f -> System.identityHashCode(f.exception()));
+
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/FailureCollector.java b/src/main/java/io/quarkus/search/app/indexing/reporting/FailureCollector.java
new file mode 100644
index 00000000..23495351
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/FailureCollector.java
@@ -0,0 +1,24 @@
+package io.quarkus.search.app.indexing.reporting;
+
+public interface FailureCollector {
+
+ enum Level {
+ CRITICAL,
+ WARNING;
+ }
+
+ enum Stage {
+ PARSING,
+ TRANSLATION,
+ INDEXING;
+ }
+
+ void warning(Stage stage, String details);
+
+ void warning(Stage stage, String details, Exception exception);
+
+ void critical(Stage stage, String details);
+
+ void critical(Stage stage, String details, Exception exception);
+
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/GithubStatusReporter.java b/src/main/java/io/quarkus/search/app/indexing/reporting/GithubStatusReporter.java
new file mode 100644
index 00000000..ee2f59a8
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/GithubStatusReporter.java
@@ -0,0 +1,116 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import static io.quarkus.search.app.indexing.reporting.StatusRenderer.toStatusDetailsMarkdown;
+import static io.quarkus.search.app.indexing.reporting.StatusRenderer.toStatusSummary;
+import static io.quarkus.search.app.util.Streams.toStream;
+import static io.quarkus.search.app.util.UncheckedIOFunction.uncheckedIO;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import io.quarkus.search.app.util.Streams;
+
+import io.quarkus.logging.Log;
+
+import org.kohsuke.github.GHIssue;
+import org.kohsuke.github.GHIssueComment;
+import org.kohsuke.github.GHIssueState;
+import org.kohsuke.github.GHRepository;
+import org.kohsuke.github.GitHub;
+import org.kohsuke.github.GitHubBuilder;
+
+class GithubStatusReporter implements StatusReporter {
+
+ private static final String STATUS_REPORT_HEADER = "## search.quarkus.io indexing status: ";
+
+ private final Clock clock;
+ private final ReportingConfig.GithubReporter config;
+
+ GithubStatusReporter(Clock clock, ReportingConfig.GithubReporter config) {
+ this.clock = clock;
+ this.config = config;
+ }
+
+ @Override
+ public void report(Status status, Map> failures) {
+ Log.infof("Reporting indexing status to GitHub.");
+ try {
+ GitHub github = new GitHubBuilder().withOAuthToken(config.token()).build();
+ GHRepository repository = github.getRepository(config.issue().repository());
+ GHIssue issue = repository.getIssue(config.issue().id());
+
+ // Update last indexing date:
+ issue.setTitle(toStatusSummary(clock, status, issue.getTitle()));
+
+ // Build a status report
+ StringBuilder newReportBuilder = new StringBuilder(STATUS_REPORT_HEADER)
+ .append(status).append('\n');
+ toStatusDetailsMarkdown(newReportBuilder, failures, true);
+ String newReport = newReportBuilder.toString();
+
+ // Update the issue description with the content of the latest comment,
+ // for convenience, and to have information available even when indexing is unstable (no issue comment).
+ // This must be done before the comment, so that notifications triggered by the comment are only sent
+ // when the issue is fully updated.
+ issue.setBody(StatusRenderer.insertMessageInIssueDescription(issue.getBody(), newReport));
+
+ // add comments if needed:
+ if (!Status.SUCCESS.equals(status)) {
+ String reportComment = StatusRenderer.truncateForGitHubMaxLength(newReport, 0);
+ switch (status) {
+ case WARNING -> {
+ // When warning, only comment if we didn't comment the same thing recently.
+ var lastRecentCommentByMe = getStatusCommentsSince(
+ issue,
+ clock.instant().minus(config.warningRepeatDelay()))
+ .reduce(Streams.last());
+ if (lastRecentCommentByMe.isPresent()
+ && lastRecentCommentByMe.get().getBody().contentEquals(reportComment)) {
+ Log.infof("Skipping the issue comment because the same message was sent recently.");
+ } else {
+ issue.comment(reportComment);
+ }
+ }
+ case UNSTABLE -> {
+ // When unstable, never comment: there'll be a retry, and we want to avoid unnecessary noise.
+ }
+ case CRITICAL ->
+ // When critical, always comment.
+ issue.comment(reportComment);
+ }
+ }
+
+ // handle issue state (open/close):
+ switch (status) {
+ case SUCCESS, WARNING -> {
+ if (GHIssueState.OPEN.equals(issue.getState())) {
+ Log.infof("Closing GitHub issue as indexing succeeded.");
+ issue.close();
+ }
+ }
+ case UNSTABLE -> {
+ Log.infof("Leaving GitHub issue in its current open/close state pending retry.");
+ }
+ case CRITICAL -> {
+ if (GHIssueState.CLOSED.equals(issue.getState())) {
+ Log.infof("Opening GitHub issue due to critical failures.");
+ issue.reopen();
+ }
+ }
+ }
+ } catch (IOException | RuntimeException e) {
+ throw new IllegalStateException("Unable to report failures to GitHub: " + e.getMessage(), e);
+ }
+ }
+
+ private Stream getStatusCommentsSince(GHIssue issue, Instant since) {
+ return toStream(issue.queryComments().since(Date.from(since)).list())
+ .filter(uncheckedIO(
+ (GHIssueComment comment) -> comment.getBody().startsWith(STATUS_REPORT_HEADER))::apply);
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/LogStatusReporter.java b/src/main/java/io/quarkus/search/app/indexing/reporting/LogStatusReporter.java
new file mode 100644
index 00000000..e4670ac6
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/LogStatusReporter.java
@@ -0,0 +1,36 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import static io.quarkus.search.app.indexing.reporting.StatusRenderer.toStatusDetailsMarkdown;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+
+import io.quarkus.logging.Log;
+
+public class LogStatusReporter implements StatusReporter {
+
+ private final Clock clock;
+
+ public LogStatusReporter(Clock clock) {
+ this.clock = clock;
+ }
+
+ @Override
+ public void report(Status status, Map> failures) {
+ StringBuilder sb = new StringBuilder(StatusRenderer.toStatusSummary(clock, status, "Indexing status"));
+ switch (status) {
+ case SUCCESS -> {
+ Log.info(sb);
+ }
+ case WARNING, UNSTABLE -> {
+ toStatusDetailsMarkdown(sb, failures, false);
+ Log.warn(sb);
+ }
+ case CRITICAL -> {
+ toStatusDetailsMarkdown(sb, failures, false);
+ Log.error(sb);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/ReportingConfig.java b/src/main/java/io/quarkus/search/app/indexing/reporting/ReportingConfig.java
new file mode 100644
index 00000000..d94b5efd
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/ReportingConfig.java
@@ -0,0 +1,35 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import io.smallrye.config.WithDefault;
+
+public interface ReportingConfig {
+ @WithDefault("log")
+ Type type();
+
+ Optional github();
+
+ interface GithubReporter {
+ GithubReporter.Issue issue();
+
+ String token();
+
+ /**
+ * @return How often to report status on GitHub when the last report was identical and contained only warnings.
+ */
+ Duration warningRepeatDelay();
+
+ interface Issue {
+ String repository();
+
+ int id();
+ }
+ }
+
+ enum Type {
+ LOG,
+ GITHUB_ISSUE;
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/Status.java b/src/main/java/io/quarkus/search/app/indexing/reporting/Status.java
new file mode 100644
index 00000000..899764cf
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/Status.java
@@ -0,0 +1,11 @@
+package io.quarkus.search.app.indexing.reporting;
+
+public enum Status {
+ SUCCESS,
+ WARNING,
+ /**
+ * Indexing failed, but a retry has been scheduled.
+ */
+ UNSTABLE,
+ CRITICAL
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/StatusRenderer.java b/src/main/java/io/quarkus/search/app/indexing/reporting/StatusRenderer.java
new file mode 100644
index 00000000..9f5574a4
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/StatusRenderer.java
@@ -0,0 +1,140 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public abstract class StatusRenderer {
+ private static final String TITLE_UPDATED_AND_STATUS_FORMAT = ": %s (updated %s)";
+ private static final Pattern TITLE_UPDATED_AND_STATUS_PATTERN = Pattern
+ .compile("(:\s*([^() ]+) )?\s*\\(updated [^)]+\\)");
+ private static final DateTimeFormatter UPDATED_DATE_FORMAT = DateTimeFormatter.ofPattern(
+ "uuuu-MM-dd'T'HH:mm:ssZZZZZ",
+ Locale.ROOT);
+
+ private static final int GITHUB_MAX_COMMENT_LENGTH = 65536;
+ private static final String INSERT_START_MARKER = "";
+ private static final String INSERT_END_MARKER = "";
+
+ public static String toStatusSummary(Clock clock, Status status, String previousSummary) {
+ String toInsert = TITLE_UPDATED_AND_STATUS_FORMAT.formatted(
+ formatStatus(status),
+ UPDATED_DATE_FORMAT.format(clock.instant().atOffset(ZoneOffset.UTC)));
+ String result = TITLE_UPDATED_AND_STATUS_PATTERN.matcher(previousSummary).replaceAll(toInsert);
+ if (result.equals(previousSummary)) {
+ // The previous summary didn't contain any mention of the status and last update; add it.
+ result = result + toInsert;
+ }
+ return result;
+ }
+
+ private static Object formatStatus(Status status) {
+ return switch (status) {
+ case SUCCESS -> "Success";
+ case WARNING -> "Warning";
+ case CRITICAL -> "Critical";
+ case UNSTABLE -> "Unstable";
+ };
+ }
+
+ public static void toStatusDetailsMarkdown(StringBuilder sb, Map> failures,
+ boolean includeException) {
+ for (Map.Entry> entry : failures.entrySet()) {
+ toStatusDetailsMarkdown(sb, entry.getValue(), entry.getKey(), includeException);
+ }
+ }
+
+ private static void toStatusDetailsMarkdown(StringBuilder sb, List failures, FailureCollector.Level level,
+ boolean includeException) {
+ if (failures.isEmpty()) {
+ return;
+ }
+ sb.append("\n### ").append(level).append("\n");
+ Map> map = failures.stream().collect(
+ // Sort failures so that two runs with the same failures will produce the same report
+ // This is critical when we try to limit the frequency of identical reports (see GH reporter)
+ Collectors.groupingBy(Failure::stage, Collectors.toCollection(() -> new TreeSet<>(Failure.COMPARATOR))));
+ for (FailureCollector.Stage stage : FailureCollector.Stage.values()) {
+ Set set = map.getOrDefault(stage, Set.of());
+ if (!set.isEmpty()) {
+ sb.append("\n");
+ sb.append(" Issues with ").append(stage).append("
:
\n\n");
+ for (Failure failure : set) {
+ sb.append(" * ").append(failure.details()).append('\n');
+ if (includeException) {
+ formatException(sb, failure.exception());
+ }
+ }
+ sb.append(" \n");
+ }
+ }
+ }
+
+ private static void formatException(StringBuilder sb, Exception exception) {
+ if (exception == null) {
+ return;
+ }
+
+ sb.append("\n \n")
+ .append(" ")
+ .append("Exception details: ").append(exception.getClass().getName())
+ .append("
\n\n ```java\n");
+
+ try (StringWriter writer = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(writer)) {
+ exception.printStackTrace(printWriter);
+ sb.append(writer.toString().replaceAll("(?m)^", " "));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ sb.append("\n ```\n \n\n");
+ }
+
+ static String insertMessageInIssueDescription(String originalIssueDescription, String newMessage) {
+ StringBuilder result = new StringBuilder(originalIssueDescription != null ? originalIssueDescription : "");
+ int startMarkerIndex = result.indexOf(INSERT_START_MARKER);
+ int endMarkerIndex = startMarkerIndex < 0 ? -1 : result.indexOf(INSERT_END_MARKER);
+ if (startMarkerIndex >= 0 && endMarkerIndex >= 0) {
+ result.replace(startMarkerIndex + INSERT_START_MARKER.length(), endMarkerIndex, "\n");
+ } else {
+ result.append('\n');
+ startMarkerIndex = result.length();
+ result.append(INSERT_START_MARKER).append('\n').append(INSERT_END_MARKER);
+ }
+ int currentIndex = startMarkerIndex + INSERT_START_MARKER.length();
+
+ String quoteIntroMessage = "\n## Last update\n";
+ result.insert(currentIndex, quoteIntroMessage);
+ currentIndex += quoteIntroMessage.length();
+
+ String truncatedMessage = truncateForGitHubMaxLength(asMarkdownQuote(newMessage), result.length());
+ result.insert(currentIndex, truncatedMessage);
+
+ return result.toString();
+ }
+
+ static String truncateForGitHubMaxLength(String message, int reservedLength) {
+ int maxLength = GITHUB_MAX_COMMENT_LENGTH - reservedLength;
+ if (message.length() > maxLength) {
+ return ("### Message truncated as it was too long\n" + message)
+ .substring(0, maxLength);
+ } else {
+ return message;
+ }
+ }
+
+ private static String asMarkdownQuote(String string) {
+ return string.lines().map(s -> "> " + s).collect(Collectors.joining("\n"));
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/reporting/StatusReporter.java b/src/main/java/io/quarkus/search/app/indexing/reporting/StatusReporter.java
new file mode 100644
index 00000000..5c446bfa
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/reporting/StatusReporter.java
@@ -0,0 +1,23 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+
+public interface StatusReporter {
+
+ void report(Status status, Map> failures);
+
+ static StatusReporter create(ReportingConfig reportingConfig, Clock clock) {
+ var type = reportingConfig.type();
+ return switch (type) {
+ case LOG -> new LogStatusReporter(clock);
+ case GITHUB_ISSUE -> {
+ ReportingConfig.GithubReporter github = reportingConfig.github().orElseThrow(
+ () -> new IllegalArgumentException(
+ "GitHub error reporting requires both GitHub repository and issue id to be specified in the properties."));
+ yield new GithubStatusReporter(clock, github);
+ }
+ };
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/state/IndexingAlreadyInProgressException.java b/src/main/java/io/quarkus/search/app/indexing/state/IndexingAlreadyInProgressException.java
new file mode 100644
index 00000000..82b404f9
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/state/IndexingAlreadyInProgressException.java
@@ -0,0 +1,7 @@
+package io.quarkus.search.app.indexing.state;
+
+public class IndexingAlreadyInProgressException extends RuntimeException {
+ IndexingAlreadyInProgressException() {
+ super("Indexing is already in progress and cannot be started at this moment");
+ }
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/state/IndexingState.java b/src/main/java/io/quarkus/search/app/indexing/state/IndexingState.java
new file mode 100644
index 00000000..1a715f14
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/state/IndexingState.java
@@ -0,0 +1,148 @@
+package io.quarkus.search.app.indexing.state;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+import io.quarkus.search.app.indexing.reporting.Failure;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.Status;
+import io.quarkus.search.app.indexing.reporting.StatusReporter;
+
+import io.quarkus.logging.Log;
+
+import io.smallrye.mutiny.subscription.Cancellable;
+
+public class IndexingState {
+
+ private final StatusReporter reporter;
+ private final RetryConfig retryConfig;
+ private final Function retryScheduler;
+
+ private final AtomicBoolean inProgress = new AtomicBoolean();
+ private final AtomicInteger attempts = new AtomicInteger();
+ private volatile Cancellable scheduledRetry;
+
+ public IndexingState(StatusReporter reporter, RetryConfig retryConfig,
+ Function retryScheduler) {
+ this.reporter = reporter;
+ this.retryConfig = retryConfig;
+ this.retryScheduler = retryScheduler;
+ }
+
+ public boolean isInProgress() {
+ return inProgress.get();
+ }
+
+ public Attempt tryStart(boolean allowRetry) {
+ // Indexing requires exclusive access to the DB/indexes
+ if (!inProgress.compareAndSet(false, true)) {
+ throw new IndexingAlreadyInProgressException();
+ }
+ if (scheduledRetry != null) {
+ scheduledRetry.cancel();
+ scheduledRetry = null;
+ }
+ return new Attempt(allowRetry);
+ }
+
+ public class Attempt implements Closeable, FailureCollector {
+
+ private final boolean allowRetry;
+ private final EnumMap> failures = new EnumMap<>(Level.class);
+
+ private Attempt(boolean allowRetry) {
+ this.allowRetry = allowRetry;
+ for (Level value : Level.values()) {
+ failures.put(value, Collections.synchronizedList(new ArrayList<>()));
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ Status status = indexingResultStatus(failures);
+ switch (status) {
+ case SUCCESS, WARNING -> {
+ attempts.set(0);
+ reporter.report(status, failures);
+ }
+ case CRITICAL -> {
+ if (scheduleRetry()) {
+ reporter.report(Status.UNSTABLE, failures);
+ } else {
+ reporter.report(Status.CRITICAL, failures);
+ }
+ }
+ }
+ } finally {
+ inProgress.set(false);
+ }
+ }
+
+ @Override
+ public void warning(Stage stage, String details) {
+ warning(stage, details, null);
+ }
+
+ @Override
+ public void warning(Stage stage, String details, Exception exception) {
+ Log.warn(details, exception);
+ failures.get(Level.WARNING).add(new Failure(Level.WARNING, stage, details, exception));
+ }
+
+ @Override
+ public void critical(Stage stage, String details) {
+ critical(stage, details, null);
+ }
+
+ @Override
+ public void critical(Stage stage, String details, Exception exception) {
+ Log.error(details, exception);
+ failures.get(Level.CRITICAL).add(new Failure(Level.CRITICAL, stage, details, exception));
+ }
+
+ private boolean scheduleRetry() {
+ if (!allowRetry) {
+ return false;
+ }
+ if (attempts.incrementAndGet() < retryConfig.maxAttempts()) {
+ try {
+ scheduledRetry = retryScheduler.apply(retryConfig.delay());
+ // If we get here, a retry was scheduled.
+ warning(Stage.INDEXING, "Indexing will be tried again later.");
+ return true;
+ } catch (RuntimeException e) {
+ // If we get here, we'll abort.
+ critical(Stage.INDEXING, "Failed to schedule retry: " + e.getMessage(),
+ e);
+ return false;
+ }
+ } else {
+ critical(Stage.INDEXING, "Tried %s time(s), aborting".formatted(retryConfig.maxAttempts()));
+ return false;
+ }
+ }
+
+ private static Status indexingResultStatus(Map> failures) {
+ if (failures.get(Level.CRITICAL).isEmpty()) {
+ if (failures.get(Level.WARNING).isEmpty()) {
+ return Status.SUCCESS;
+ } else {
+ return Status.WARNING;
+ }
+ } else {
+ return Status.CRITICAL;
+ }
+ }
+
+ }
+
+}
diff --git a/src/main/java/io/quarkus/search/app/indexing/state/RetryConfig.java b/src/main/java/io/quarkus/search/app/indexing/state/RetryConfig.java
new file mode 100644
index 00000000..ee83583c
--- /dev/null
+++ b/src/main/java/io/quarkus/search/app/indexing/state/RetryConfig.java
@@ -0,0 +1,13 @@
+package io.quarkus.search.app.indexing.state;
+
+import java.time.Duration;
+
+import io.smallrye.config.WithDefault;
+
+public interface RetryConfig {
+ @WithDefault("3")
+ int maxAttempts();
+
+ @WithDefault("1M") // 1 Minute
+ Duration delay();
+}
diff --git a/src/main/java/io/quarkus/search/app/quarkusio/QuarkusIO.java b/src/main/java/io/quarkus/search/app/quarkusio/QuarkusIO.java
index b97d6f15..61c7a1f0 100644
--- a/src/main/java/io/quarkus/search/app/quarkusio/QuarkusIO.java
+++ b/src/main/java/io/quarkus/search/app/quarkusio/QuarkusIO.java
@@ -27,7 +27,7 @@
import io.quarkus.search.app.entity.Guide;
import io.quarkus.search.app.entity.I18nData;
import io.quarkus.search.app.entity.Language;
-import io.quarkus.search.app.indexing.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
import io.quarkus.search.app.util.CloseableDirectory;
import io.quarkus.search.app.util.GitCloneDirectory;
import io.quarkus.search.app.util.GitInputProvider;
diff --git a/src/main/java/io/quarkus/search/app/util/MutinyUtils.java b/src/main/java/io/quarkus/search/app/util/MutinyUtils.java
index 4997acc6..262971b4 100644
--- a/src/main/java/io/quarkus/search/app/util/MutinyUtils.java
+++ b/src/main/java/io/quarkus/search/app/util/MutinyUtils.java
@@ -25,4 +25,16 @@ public static Uni waitForeverFor(Supplier condition, Duration ste
.skip().where(ignored -> true)
.toUni().replaceWithVoid();
}
+
+ public static Uni runOnWorkerPool(Supplier action) {
+ return Uni.createFrom()
+ .item(action)
+ .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
+ }
+
+ public static Uni schedule(Duration delay, Supplier> action) {
+ return Uni.createFrom().nullItem()
+ .onItem().delayIt().by(delay)
+ .onItem().transformToUni(ignored -> action.get());
+ }
}
diff --git a/src/main/java/io/quarkus/search/app/util/UrlInputProvider.java b/src/main/java/io/quarkus/search/app/util/UrlInputProvider.java
index 0fae7798..1a5f53f2 100644
--- a/src/main/java/io/quarkus/search/app/util/UrlInputProvider.java
+++ b/src/main/java/io/quarkus/search/app/util/UrlInputProvider.java
@@ -11,7 +11,7 @@
import java.nio.file.Path;
import io.quarkus.search.app.hibernate.InputProvider;
-import io.quarkus.search.app.indexing.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
public class UrlInputProvider implements InputProvider {
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index bf965009..45a947d2 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -40,14 +40,16 @@ indexing.scheduled.cron=0 0 0 * * ?
# See quarkus.hibernate-search-standalone.elasticsearch.indexing below.
indexing.parallelism=80
indexing.batch-size=10
-# Error reporting to GitHub issue:
-# See https://github.com/quarkusio/search.quarkus.io/issues/89
-# See https://github.com/quarkusio/search.quarkus.io/issues/127
-%prod.indexing.error-reporting.type=github-issue
-%prod.indexing.error-reporting.github.warning-repeat-delay=14d
-%prod.indexing.error-reporting.github.issue.repository=quarkusio/search.quarkus.io
-%prod.indexing.error-reporting.github.issue.id=${GITHUB_STATUS_ISSUE_ID}
-%prod.indexing.error-reporting.github.token=${GITHUB_OAUTH}
+# Status reporting to GitHub issue:
+# See https://github.com/quarkusio/search.quarkus.io/issues/130
+# See https://github.com/quarkusio/search.quarkus.io/issues/131
+%prod.indexing.reporting.type=github-issue
+%prod.indexing.reporting.github.warning-repeat-delay=14d
+%prod.indexing.reporting.github.issue.repository=quarkusio/search.quarkus.io
+%prod.indexing.reporting.github.issue.id=${GITHUB_STATUS_ISSUE_ID}
+%prod.indexing.reporting.github.token=${GITHUB_OAUTH}
+%prod.indexing.retry.max-attempts=3
+%prod.indexing.retry.delay=1M
########################
# More secure HTTP defaults
@@ -106,6 +108,8 @@ quarkus.hibernate-search-standalone.elasticsearch.max-connections=90
%test.quarkusio.git-uri=file:tests-should-use-quarkus-io-sample-setup-annotation
# disable scheduled indexing for dev/tests to not mess up anything with an unexpected reindexing:
%dev,test.indexing.scheduled.cron=off
+# disable retry: we don't expect failures expect in specific tests, which override this
+%dev,test.indexing.retry.max-attempts=1
# Allow localhost in particular
%dev,staging.quarkus.http.cors.origins=/.*/
%dev,staging.quarkus.http.header."Access-Control-Allow-Private-Network".value=true
@@ -117,7 +121,7 @@ quarkus.hibernate-search-standalone.elasticsearch.max-connections=90
# Shorter format
quarkus.log.console.format=%d{HH:mm:ss} %-5p [%c{1.2.}] %s%e%n
# disable any error reporting to GitHub, simple logger will be used instead:
-%dev,test.indexing.error-reporting.type=log
+%dev,test.indexing.reporting.type=log
# Logging
# Allow trace logs in important categories
quarkus.log.category."io.quarkus.search.app".min-level=TRACE
@@ -208,9 +212,12 @@ quarkus.openshift.env.vars.QUARKUS_PROFILE=prod
quarkus.openshift.env.vars.INDEXING_QUEUE_COUNT=${INDEXING_QUEUE_COUNT}
quarkus.openshift.env.vars.INDEXING_BULK_SIZE=${INDEXING_BULK_SIZE}
# Initial indexing may take a while, especially the quarkus.io Git cloning
+# Tests indicate it can take ~240s in prod,
+# so we'll use 3 times as much as the limit, to allow for 3 attempts.
+# 15S * 48 = 720S = 240S * 3
quarkus.openshift.startup-probe.initial-delay=30S
quarkus.openshift.startup-probe.period=15S
-quarkus.openshift.startup-probe.failure-threshold=40
+quarkus.openshift.startup-probe.failure-threshold=48
# Declare the management port on the service
quarkus.openshift.ports."management".container-port=9000
quarkus.openshift.ports."management".host-port=90
diff --git a/src/test/java/io/quarkus/search/app/fetching/FetchingServiceTest.java b/src/test/java/io/quarkus/search/app/fetching/FetchingServiceTest.java
index 39153f7c..84740a68 100644
--- a/src/test/java/io/quarkus/search/app/fetching/FetchingServiceTest.java
+++ b/src/test/java/io/quarkus/search/app/fetching/FetchingServiceTest.java
@@ -18,7 +18,7 @@
import io.quarkus.search.app.entity.I18nData;
import io.quarkus.search.app.entity.Language;
import io.quarkus.search.app.hibernate.InputProvider;
-import io.quarkus.search.app.indexing.FailureCollector;
+import io.quarkus.search.app.indexing.reporting.FailureCollector;
import io.quarkus.search.app.quarkusio.QuarkusIO;
import io.quarkus.search.app.testsupport.GitTestUtils;
import io.quarkus.search.app.util.CloseableDirectory;
@@ -31,6 +31,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.apache.commons.io.file.PathUtils;
@@ -39,7 +40,10 @@
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.revwalk.RevCommit;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+@ExtendWith(MockitoExtension.class)
class FetchingServiceTest {
// Unfortunately we can't use @TempDir here,
@@ -58,6 +62,9 @@ class FetchingServiceTest {
}
}
+ @Mock
+ FailureCollector failureCollectorMock;
+
@BeforeAll
static void initOrigin() throws IOException, GitAPIException {
Path sourceRepoPath = tmpDir.path();
@@ -194,7 +201,7 @@ static void deleteTmpDir() throws IOException {
@Test
void fetchQuarkusIo() throws Exception {
- try (QuarkusIO quarkusIO = service.fetchQuarkusIo(new FailureCollector())) {
+ try (QuarkusIO quarkusIO = service.fetchQuarkusIo(failureCollectorMock)) {
try (var guides = quarkusIO.guides()) {
assertThat(guides)
.hasSize(10)
@@ -295,15 +302,15 @@ void fetchQuarkusIo() throws Exception {
}));
}
}
+
// now let's update some guides and make sure that the content is fetched correctly:
updateMainRepository();
-
// NOTE that after an update we'll have non-translated titles and summaries,
// since in this test we've only updated them in the "main" repository,
// and as a result there's no translation for them in localized sites.
// Content file though is still the one from the localized site!
//
- try (QuarkusIO quarkusIO = service.fetchQuarkusIo(new FailureCollector())) {
+ try (QuarkusIO quarkusIO = service.fetchQuarkusIo(failureCollectorMock)) {
try (var guides = quarkusIO.guides()) {
assertThat(guides)
.hasSize(10)
diff --git a/src/test/java/io/quarkus/search/app/indexing/FailureCollectorTest.java b/src/test/java/io/quarkus/search/app/indexing/FailureCollectorTest.java
deleted file mode 100644
index 7c79e4e3..00000000
--- a/src/test/java/io/quarkus/search/app/indexing/FailureCollectorTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package io.quarkus.search.app.indexing;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.time.Clock;
-import java.time.Instant;
-import java.time.ZoneOffset;
-
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-
-class FailureCollectorTest {
-
- private final Clock clock = Clock.fixed(Instant.EPOCH, ZoneOffset.UTC);
-
- @ParameterizedTest
- @CsvSource(textBlock = """
- [PROD] Status report of quarkus.io content indexing: Critical (updated 1970-01-01T00:00:00Z),\
- [PROD] Status report of quarkus.io content indexing,Critical
- [PROD] Status report of quarkus.io content indexing: Warning (updated 1970-01-01T00:00:00Z),\
- [PROD] Status report of quarkus.io content indexing,Warning
- [PROD] Status report of quarkus.io content indexing: Success (updated 1970-01-01T00:00:00Z),\
- [PROD] Status report of quarkus.io content indexing,Success
- [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z),Critical
- [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z),Warning
- [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z),Success
- [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z),Critical
- [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z),Warning
- [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z),Success
- [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z),Critical
- [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z),Warning
- [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z),Success
- [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z),Critical
- [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z),Warning
- [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
- [PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z),Success
- """)
- void insertStatusAndUpdateDate(String expected, String currentTitle, String status) {
- assertThat(FailureCollector.GithubFailureReporter.insertStatusAndUpdateDate(clock, currentTitle, status))
- .isEqualTo(expected);
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/io/quarkus/search/app/indexing/reporting/StatusRendererTest.java b/src/test/java/io/quarkus/search/app/indexing/reporting/StatusRendererTest.java
new file mode 100644
index 00000000..ae85db48
--- /dev/null
+++ b/src/test/java/io/quarkus/search/app/indexing/reporting/StatusRendererTest.java
@@ -0,0 +1,83 @@
+package io.quarkus.search.app.indexing.reporting;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+class StatusRendererTest {
+
+ private final Clock clock = Clock.fixed(Instant.EPOCH, ZoneOffset.UTC);
+
+ @ParameterizedTest
+ @CsvSource(textBlock = """
+ [PROD] Status report of quarkus.io content indexing: Critical (updated 1970-01-01T00:00:00Z),\
+ CRITICAL,[PROD] Status report of quarkus.io content indexing
+ [PROD] Status report of quarkus.io content indexing: Warning (updated 1970-01-01T00:00:00Z),\
+ WARNING,[PROD] Status report of quarkus.io content indexing
+ [PROD] Status report of quarkus.io content indexing: Success (updated 1970-01-01T00:00:00Z),\
+ SUCCESS,[PROD] Status report of quarkus.io content indexing
+ [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
+ CRITICAL,[PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
+ WARNING,[PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
+ SUCCESS,[PROD] search.quarkus.io indexing status (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
+ CRITICAL,[PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
+ WARNING,[PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
+ SUCCESS,[PROD] search.quarkus.io indexing status: Critical (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
+ CRITICAL,[PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
+ WARNING,[PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
+ SUCCESS,[PROD] search.quarkus.io indexing status: Warning (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Critical (updated 1970-01-01T00:00:00Z),\
+ CRITICAL,[PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Warning (updated 1970-01-01T00:00:00Z),\
+ WARNING,[PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z)
+ [PROD] search.quarkus.io indexing status: Success (updated 1970-01-01T00:00:00Z),\
+ SUCCESS,[PROD] search.quarkus.io indexing status: Success (updated 2024-05-28T00:03:23Z)
+ """)
+ void toStatusSummary(String expected, Status status, String currentTitle) {
+ assertThat(StatusRenderer.toStatusSummary(clock, status, currentTitle))
+ .isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @CsvSource(textBlock = """
+ 'Original description
+
+ ## Last update
+ > Automatic message
+ > and some more
+ ',\
+ 'Original description',\
+ 'Automatic message
+ and some more'
+ 'Original description
+
+ ## Last update
+ > New automatic message
+ > and some more
+ ',\
+ 'Original description
+
+ Random garbage
+ ',\
+ 'New automatic message
+ and some more'
+ """)
+ void insertMessageInIssueDescription(String expected, String originalIssueDescription, String newMessage) {
+ assertThat(StatusRenderer.insertMessageInIssueDescription(originalIssueDescription, newMessage))
+ .isEqualTo(expected);
+ }
+
+}
diff --git a/src/test/java/io/quarkus/search/app/indexing/state/ExplicitRetryConfig.java b/src/test/java/io/quarkus/search/app/indexing/state/ExplicitRetryConfig.java
new file mode 100644
index 00000000..c8bcf406
--- /dev/null
+++ b/src/test/java/io/quarkus/search/app/indexing/state/ExplicitRetryConfig.java
@@ -0,0 +1,10 @@
+package io.quarkus.search.app.indexing.state;
+
+import java.time.Duration;
+
+public record ExplicitRetryConfig(
+ int maxAttempts,
+ Duration delay)
+ implements
+ RetryConfig {
+}
diff --git a/src/test/java/io/quarkus/search/app/indexing/state/IndexingStateTest.java b/src/test/java/io/quarkus/search/app/indexing/state/IndexingStateTest.java
new file mode 100644
index 00000000..abb4c99b
--- /dev/null
+++ b/src/test/java/io/quarkus/search/app/indexing/state/IndexingStateTest.java
@@ -0,0 +1,227 @@
+package io.quarkus.search.app.indexing.state;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+import io.quarkus.search.app.indexing.reporting.FailureCollector.Stage;
+import io.quarkus.search.app.indexing.reporting.Status;
+import io.quarkus.search.app.indexing.reporting.StatusReporter;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import io.smallrye.mutiny.subscription.Cancellable;
+
+@ExtendWith(MockitoExtension.class)
+public class IndexingStateTest {
+
+ @Mock
+ StatusReporter statusReporterMock;
+
+ @Mock
+ Function retrySchedulerMock;
+
+ @Test
+ void success() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ // No warning/critical: success
+ }
+ verify(statusReporterMock).report(eq(Status.SUCCESS), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void concurrent() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ // Try concurrent indexing...
+ assertThatThrownBy(() -> state.tryStart(true))
+ .isInstanceOf(IndexingAlreadyInProgressException.class);
+ }
+ verify(statusReporterMock).report(eq(Status.SUCCESS), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void warning() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.warning(Stage.INDEXING, "Some warning");
+ }
+ verify(statusReporterMock).report(eq(Status.WARNING), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void failure_max1Attempt() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(1, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.CRITICAL), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void failure_retryNotAllowed() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(false)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.CRITICAL), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void failure_retry_then_success() {
+ var retryDelay = Duration.ofMinutes(2);
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, retryDelay), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ var cancellableMock = mock(Cancellable.class);
+ when(retrySchedulerMock.apply(any())).thenReturn(cancellableMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.UNSTABLE), anyMap());
+ verify(retrySchedulerMock).apply(retryDelay);
+ assertThat(state.isInProgress()).isFalse();
+
+ reset(statusReporterMock, retrySchedulerMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ verify(cancellableMock).cancel();
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.UNSTABLE), anyMap());
+ verify(retrySchedulerMock).apply(retryDelay);
+ assertThat(state.isInProgress()).isFalse();
+
+ reset(statusReporterMock, retrySchedulerMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ verify(cancellableMock).cancel();
+ assertThat(state.isInProgress()).isTrue();
+
+ // No warning/critical: success
+ }
+ verify(statusReporterMock).report(eq(Status.SUCCESS), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void failure_retry_then_maxAttempts() {
+ var retryDelay = Duration.ofMinutes(2);
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, retryDelay), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ var cancellableMock = mock(Cancellable.class);
+ when(retrySchedulerMock.apply(any())).thenReturn(cancellableMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.UNSTABLE), anyMap());
+ verify(retrySchedulerMock).apply(retryDelay);
+ assertThat(state.isInProgress()).isFalse();
+
+ reset(statusReporterMock, retrySchedulerMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ verify(cancellableMock).cancel();
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.UNSTABLE), anyMap());
+ verify(retrySchedulerMock).apply(retryDelay);
+ assertThat(state.isInProgress()).isFalse();
+
+ reset(statusReporterMock, retrySchedulerMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ verify(cancellableMock).cancel();
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.CRITICAL), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+ @Test
+ void failure_retry_then_retryNotAllowed() {
+ var state = new IndexingState(statusReporterMock,
+ new ExplicitRetryConfig(3, Duration.ofMinutes(2)), retrySchedulerMock);
+ assertThat(state.isInProgress()).isFalse();
+
+ var cancellableMock = mock(Cancellable.class);
+ when(retrySchedulerMock.apply(any())).thenReturn(cancellableMock);
+
+ try (IndexingState.Attempt attempt = state.tryStart(true)) {
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.UNSTABLE), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+
+ try (IndexingState.Attempt attempt = state.tryStart(false)) {
+ verify(cancellableMock).cancel();
+ assertThat(state.isInProgress()).isTrue();
+
+ attempt.critical(Stage.INDEXING, "Something critical");
+ }
+ verify(statusReporterMock).report(eq(Status.CRITICAL), anyMap());
+ assertThat(state.isInProgress()).isFalse();
+ }
+
+}
diff --git a/src/test/java/io/quarkus/search/app/util/MutinyUtilsTest.java b/src/test/java/io/quarkus/search/app/util/MutinyUtilsTest.java
new file mode 100644
index 00000000..498ae449
--- /dev/null
+++ b/src/test/java/io/quarkus/search/app/util/MutinyUtilsTest.java
@@ -0,0 +1,56 @@
+package io.quarkus.search.app.util;
+
+import static io.quarkus.search.app.util.MutinyUtils.runOnWorkerPool;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class MutinyUtilsTest {
+ @Test
+ void schedule() {
+ Supplier action = mock();
+ Consumer itemCallback = mock();
+ Consumer failureCallback = mock();
+
+ MutinyUtils.schedule(Duration.ofSeconds(2), () -> runOnWorkerPool(action))
+ .subscribe().with(itemCallback, failureCallback);
+
+ // Delayed execution should happen after approximately two seconds
+ // (and definitely no earlier than after one second).
+ await().between(Duration.ofSeconds(1), Duration.ofSeconds(4))
+ .untilAsserted(() -> verify(action).get());
+
+ verify(itemCallback).accept(null);
+ verifyNoInteractions(failureCallback);
+ }
+
+ @Test
+ void schedule_cancel() {
+ Supplier action = mock();
+ Consumer itemCallback = mock();
+ Consumer failureCallback = mock();
+
+ var cancellable = MutinyUtils.schedule(Duration.ofSeconds(2), () -> runOnWorkerPool(action))
+ .subscribe().with(itemCallback, failureCallback);
+
+ // Cancel immediately
+ cancellable.cancel();
+
+ // Cancellation should prevent the delayed execution.
+ // We'll consider that if it doesn't happen after 4 seconds, it'll likely never happen (as expected).
+ await().during(Duration.ofSeconds(4))
+ .untilAsserted(() -> verifyNoInteractions(action));
+ verifyNoInteractions(itemCallback, failureCallback);
+ }
+}