diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index a41f513d4..2128acf1b 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -389,10 +389,12 @@ public synchronized void markTaskCompleted (Job job, int taskId) { } /** - * When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false. + * Record an error that happened while a worker was processing a task on the given job. This method is tolerant + * of job being null, because it's called on a code path where any number of things could be wrong or missing. + * This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler. + * Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false. * The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user * to see the error message. User will then need to manually delete it, which will remove the result assembler. - * This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler. */ private synchronized void recordJobError (Job job, String error) { if (job != null) { @@ -492,21 +494,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) { // Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization, // because that method only uses final fields of the job. Job job = null; - MultiOriginAssembler assembler; try { + MultiOriginAssembler assembler; synchronized (this) { job = findJob(workResult.jobId); + // Record any error reported by the worker and don't pass bad results on to regional result assembly. + // This will mark the job as errored and not-active, stopping distribution of tasks to workers. + // To ensure that happens, record errors before any other conditional that could exit this method. + if (workResult.error != null) { + recordJobError(job, workResult.error); + return; + } assembler = resultAssemblers.get(workResult.jobId); if (job == null || assembler == null || !job.isActive()) { // This will happen naturally for all delivered tasks after a job is deleted or it errors out. LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId); return; } - if (workResult.error != null) { - // Record any error reported by the worker and don't pass bad results on to regional result assembly. - recordJobError(job, workResult.error); - return; - } // Mark tasks completed first before passing results to the assembler. On the final result received, // this will minimize the risk of race conditions by quickly making the job invisible to incoming stray // results from spurious redeliveries, before the assembler is busy finalizing and uploading results. diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index e49cf40a9..28829c33c 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) { public int deliveryPass = 0; /** - * If any error compromises the usabilty or quality of results from any origin, it is recorded here. + * If any error compromises the usability or quality of results from any origin, it is recorded here. * This is a Set because identical errors are likely to be reported from many workers or individual tasks. + * The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks. + * There is some risk here of accumulating unbounded amounts of large error messages (see #919). + * The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set. */ public final Set errors = new HashSet();