diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 2aea67dc2..0500df8b4 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId); throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId); } + // Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of + // active jobs until we're sure the result assembler was constructed without any errors. Always add and remove + // the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887). WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis); Job job = new Job(templateTask, workerTags); - jobs.put(job.workerCategory, job); // Register the regional job so results received from multiple workers can be assembled into one file. + // If any parameters fail checks here, an exception may cause this method to exit early. // TODO encapsulate MultiOriginAssemblers in a new Component - // Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler. - // That is not catastrophic, but the user may need to recognize and delete the stalled regional job. MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage); resultAssemblers.put(templateTask.jobId, assembler); + // A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job. + jobs.put(job.workerCategory, job); + + // If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID. if (config.testTaskRedelivery()) { - // This is a fake job for testing, don't confuse the worker startup code below with null graph ID. return; } @@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) { } /** - * When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false. + * Record an error that happened while a worker was processing a task on the given job. This method is tolerant + * of job being null, because it's called on a code path where any number of things could be wrong or missing. + * This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler. + * Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false. * The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user * to see the error message. User will then need to manually delete it, which will remove the result assembler. - * This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler. */ private synchronized void recordJobError (Job job, String error) { if (job != null) { - job.errors.add(error); + // Limit the number of errors recorded to one. + // Still using a Set instead of just String since the set of errors is exposed in a UI-facing API. + if (job.errors.isEmpty()) { + job.errors.add(error); + } } } @@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) { // Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization, // because that method only uses final fields of the job. Job job = null; - MultiOriginAssembler assembler; try { + MultiOriginAssembler assembler; synchronized (this) { job = findJob(workResult.jobId); + // Record any error reported by the worker and don't pass bad results on to regional result assembly. + // This will mark the job as errored and not-active, stopping distribution of tasks to workers. + // To ensure that happens, record errors before any other conditional that could exit this method. + if (workResult.error != null) { + recordJobError(job, workResult.error); + return; + } assembler = resultAssemblers.get(workResult.jobId); if (job == null || assembler == null || !job.isActive()) { // This will happen naturally for all delivered tasks after a job is deleted or it errors out. LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId); return; } - if (workResult.error != null) { - // Record any error reported by the worker and don't pass bad results on to regional result assembly. - recordJobError(job, workResult.error); - return; - } // Mark tasks completed first before passing results to the assembler. On the final result received, // this will minimize the risk of race conditions by quickly making the job invisible to incoming stray // results from spurious redeliveries, before the assembler is busy finalizing and uploading results. diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index e49cf40a9..28829c33c 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) { public int deliveryPass = 0; /** - * If any error compromises the usabilty or quality of results from any origin, it is recorded here. + * If any error compromises the usability or quality of results from any origin, it is recorded here. * This is a Set because identical errors are likely to be reported from many workers or individual tasks. + * The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks. + * There is some risk here of accumulating unbounded amounts of large error messages (see #919). + * The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set. */ public final Set errors = new HashSet(); diff --git a/src/main/java/com/conveyal/analysis/components/eventbus/ErrorEvent.java b/src/main/java/com/conveyal/analysis/components/eventbus/ErrorEvent.java index 24dc542f1..a157a9d5b 100644 --- a/src/main/java/com/conveyal/analysis/components/eventbus/ErrorEvent.java +++ b/src/main/java/com/conveyal/analysis/components/eventbus/ErrorEvent.java @@ -2,6 +2,8 @@ import com.conveyal.r5.util.ExceptionUtils; +import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace; + /** * This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be * recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on @@ -59,20 +61,4 @@ public String traceWithContext (boolean verbose) { return builder.toString(); } - private static String filterStackTrace (String stackTrace) { - if (stackTrace == null) return null; - final String unknownFrame = "Unknown stack frame, probably optimized out by JVM."; - String error = stackTrace.lines().findFirst().get(); - String frame = stackTrace.lines() - .map(String::strip) - .filter(s -> s.startsWith("at ")) - .findFirst().orElse(unknownFrame); - String conveyalFrame = stackTrace.lines() - .map(String::strip) - .filter(s -> s.startsWith("at com.conveyal.")) - .filter(s -> !frame.equals(s)) - .findFirst().orElse(""); - return String.join("\n", error, frame, conveyalFrame); - } - } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalWorkResult.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalWorkResult.java index d15531c03..c606acc0f 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalWorkResult.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalWorkResult.java @@ -69,11 +69,17 @@ public RegionalWorkResult(OneOriginResult result, RegionalTask task) { // TODO checkTravelTimeInvariants, checkAccessibilityInvariants to verify that values are monotonically increasing } - /** Constructor used when results for this origin are considered unusable due to an unhandled error. */ + /** + * Constructor used when results for this origin are considered unusable due to an unhandled error. Besides the + * short-form exception, most result fields are left null. There is no information to communicate, and because + * errors are often produced faster than valid results, we don't want to flood the backend with unnecessarily + * voluminous error reports. The short-form exception message is used for a similar reason, to limit the total size + * of error messages. + */ public RegionalWorkResult(Throwable t, RegionalTask task) { this.jobId = task.jobId; this.taskId = task.taskId; - this.error = ExceptionUtils.shortAndLongString(t); + this.error = ExceptionUtils.filterStackTrace(t); } } diff --git a/src/main/java/com/conveyal/r5/util/ExceptionUtils.java b/src/main/java/com/conveyal/r5/util/ExceptionUtils.java index 901651e85..fca364b2a 100644 --- a/src/main/java/com/conveyal/r5/util/ExceptionUtils.java +++ b/src/main/java/com/conveyal/r5/util/ExceptionUtils.java @@ -50,4 +50,29 @@ public static String shortAndLongString (Throwable throwable) { return shortCauseString(throwable) + "\n[detail follows]\n" + stackTraceString(throwable); } + /** + * Given a full stack trace string with one frame per line, keep only the exception name, the first stack frame, + * and all additional frames that come from Conveyal packages. This yields a much shorter stack trace that still + * shows where the exception was thrown and where the problem originates in our own code. + */ + public static String filterStackTrace (String stackTrace) { + if (stackTrace == null) return null; + final String unknownFrame = "Unknown stack frame, probably optimized out by JVM."; + String error = stackTrace.lines().findFirst().get(); + String frame = stackTrace.lines() + .map(String::strip) + .filter(s -> s.startsWith("at ")) + .findFirst().orElse(unknownFrame); + String conveyalFrame = stackTrace.lines() + .map(String::strip) + .filter(s -> s.startsWith("at com.conveyal.")) + .filter(s -> !frame.equals(s)) + .findFirst().orElse(""); + return String.join("\n", error, frame, conveyalFrame); + } + + public static String filterStackTrace (Throwable throwable) { + return filterStackTrace(stackTraceString(throwable)); + } + }