Skip to content

Commit

Permalink
Merge branch 'dev' into broker-static-final
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd authored Jan 26, 2024
2 parents 7060665 + 32eb1dc commit d1b202d
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void respondToException(Exception e, Request request, Response response,
// Include a stack trace except when the error is known to be about unauthenticated or unauthorized access,
// in which case we don't want to leak information about the server to people scanning it for weaknesses.
if (type != UNAUTHORIZED && type != FORBIDDEN) {
body.put("stackTrace", errorEvent.stackTrace);
body.put("stackTrace", errorEvent.filteredStackTrace);
}
response.status(code);
response.type("application/json");
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -380,14 +384,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -483,21 +493,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
* the workers as well, but we'd have to be careful not to generate hundreds of messages at once.
*/
public class ErrorEvent extends Event {

// We may serialize this object, so we convert the Throwable to two strings to control its representation.
// All Events are intended to be eligible for serialization into a log or database, so we convert the Throwable to
// some Strings to determine its representation in a simple way.
// For flexibility in event handlers, it is tempting to hold on to the original Throwable instead of derived
// Strings. Exceptions are famously slow, but it's the initial creation and filling in the stack trace that are
// slow. Once the instace exists, repeatedly examining its stack trace should not be prohibitively costly. Still,
// we do probably gain some efficiency by converting the stack trace to a String once and reusing that.
// slow. Once the instance exists, repeatedly examining its stack trace should not be prohibitively costly.

public final String summary;

Expand All @@ -23,11 +25,16 @@ public class ErrorEvent extends Event {
*/
public final String httpPath;

/** The full stack trace of the exception that occurred. */
public final String stackTrace;

/** A minimal stack trace showing the immediate cause within Conveyal code. */
public final String filteredStackTrace;

public ErrorEvent (Throwable throwable, String httpPath) {
this.summary = ExceptionUtils.shortCauseString(throwable);
this.stackTrace = ExceptionUtils.stackTraceString(throwable);
this.filteredStackTrace = ExceptionUtils.filterStackTrace(throwable);
this.httpPath = httpPath;
}

Expand All @@ -54,25 +61,9 @@ public String traceWithContext (boolean verbose) {
if (verbose) {
builder.append(stackTrace);
} else {
builder.append(filterStackTrace(stackTrace));
builder.append(filteredStackTrace);
}
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,11 @@ private RegionalAnalysis updateRegionalAnalysis (Request request, Response respo
* the given regional analysis.
*/
private JsonNode getScenarioJsonUrl (Request request, Response response) {
RegionalAnalysis regionalAnalysis = Persistence.regionalAnalyses
.findByIdIfPermitted(request.params("_id"), UserPermissions.from(request));
RegionalAnalysis regionalAnalysis = Persistence.regionalAnalyses.findByIdIfPermitted(
request.params("_id"),
DBProjection.exclude("request.scenario.modifications"),
UserPermissions.from(request)
);
// In the persisted objects, regionalAnalysis.scenarioId seems to be null. Get it from the embedded request.
final String networkId = regionalAnalysis.bundleId;
final String scenarioId = regionalAnalysis.request.scenarioId;
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/com/conveyal/analysis/persistence/MongoMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ public int size() {
return (int) wrappedCollection.getCount();
}

public V findByIdFromRequestIfPermitted(Request request) {
return findByIdIfPermitted(request.params("_id"), UserPermissions.from(request));
}

public V findByIdIfPermitted(String id, UserPermissions userPermissions) {
V result = wrappedCollection.findOneById(id);
/**
* `fields` is nullable.
*/
public V findByIdIfPermitted(String id, DBObject fields, UserPermissions userPermissions) {
V result = wrappedCollection.findOneById(id, fields);

if (result == null) {
throw AnalysisServerException.notFound(String.format(
Expand All @@ -61,6 +60,14 @@ public V findByIdIfPermitted(String id, UserPermissions userPermissions) {
}
}

public V findByIdIfPermitted(String id, UserPermissions userPermissions) {
return findByIdIfPermitted(id, null, userPermissions);
}

public V findByIdFromRequestIfPermitted(Request request) {
return findByIdIfPermitted(request.params("_id"), UserPermissions.from(request));
}

public V get(String key) {
return wrappedCollection.findOneById(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.util.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,21 +91,27 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
this.job = job;
this.nOriginsTotal = job.nTasksTotal;
this.originsReceived = new BitSet(job.nTasksTotal);
// Check that origin and destination sets are not too big for generating CSV files.
if (!job.templateTask.makeTauiSite &&
job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)
) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = job.templateTask.destinationPointSets[0];
if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) {
if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS ||
destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS
) {
throw new AnalysisServerException(String.format(
"Freeform requests limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
// If results have been requested for freeform origins, check that the origin and
// destination pointsets are not too big for generating CSV files.
RegionalTask task = job.templateTask;
if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = task.destinationPointSets[0];
int nDestinations = destinationPointSet.featureCount();
int nODPairs = task.oneToOne ? nOriginsTotal : nOriginsTotal * nDestinations;
if (task.recordTimes &&
(nDestinations > MAX_FREEFORM_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Travel time results limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
if (task.includePathResults &&
(nDestinations > PathResult.MAX_PATH_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Path results limited to %d destinations and %d origin-destination pairs.",
PathResult.MAX_PATH_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
}

Expand Down Expand Up @@ -152,8 +160,11 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName);
}
}
} catch (AnalysisServerException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e));
// Handle any obscure problems we don't want end users to see without context of MultiOriginAssembler.
throw new RuntimeException("Exception while creating multi-origin assembler: " + e.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void load (ShapeDataStore store) throws Exception {
for (SimpleFeatureIterator it = sfc.features(); it.hasNext();) {
GeobufFeature feat = new GeobufFeature(it.next());
feat.id = null;
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID10"));
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID20"));
feat.properties = new HashMap<>();
store.add(feat);
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PathResult {
* These results are returned to the backend over an HTTP API so we don't want to risk making them too huge.
* This could be set to a higher number in cases where you know the result return channel can handle the size.
*/
public static int maxDestinations = 5000;
public static final int MAX_PATH_DESTINATIONS = 5_000;

private final int nDestinations;
/**
Expand All @@ -49,7 +49,7 @@ public class PathResult {
public final Multimap<RouteSequence, Iteration>[] iterationsForPathTemplates;
private final TransitLayer transitLayer;

public static String[] DATA_COLUMNS = new String[]{
public static final String[] DATA_COLUMNS = new String[]{
"routes",
"boardStops",
"alightStops",
Expand All @@ -70,8 +70,8 @@ public PathResult(AnalysisWorkerTask task, TransitLayer transitLayer) {
// In regional analyses, return paths to all destinations
nDestinations = task.nTargetsPerOrigin();
// This limitation reflects the initial design, for use with freeform pointset destinations
if (nDestinations > maxDestinations) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + maxDestinations);
if (nDestinations > MAX_PATH_DESTINATIONS) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + MAX_PATH_DESTINATIONS);
}
}
iterationsForPathTemplates = new Multimap[nDestinations];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public RegionalWorkResult(OneOriginResult result, RegionalTask task) {
// TODO checkTravelTimeInvariants, checkAccessibilityInvariants to verify that values are monotonically increasing
}

/** Constructor used when results for this origin are considered unusable due to an unhandled error. */
/**
* Constructor used when results for this origin are considered unusable due to an unhandled error. Besides the
* short-form exception, most result fields are left null. There is no information to communicate, and because
* errors are often produced faster than valid results, we don't want to flood the backend with unnecessarily
* voluminous error reports. The short-form exception message is used for a similar reason, to limit the total size
* of error messages.
*/
public RegionalWorkResult(Throwable t, RegionalTask task) {
this.jobId = task.jobId;
this.taskId = task.taskId;
this.error = ExceptionUtils.shortAndLongString(t);
this.error = ExceptionUtils.filterStackTrace(t);
}

}
Loading

0 comments on commit d1b202d

Please sign in to comment.