Skip to content

Commit

Permalink
Update to v7.1 (#32)
Browse files Browse the repository at this point in the history
* fix comment on NETWORK_FORMAT_VERSION

* Do not record opportunities 120 minutes away

* Fix freeform guardrails

Correct logic for checking oneToOne analyses
Check limit on number of destinations for path analyses in broker

* only record job when assembler created, fixes conveyal#887

* record errors (stopping job) before other checks

addresses conveyal#887

* report worker errors as much shorter stack trace

move filterStackTrace method into shared utility class

* record only one error per job in broker

addresses conveyal#919 and conveyal#887

* Add final modifier to PathResult constants

* Use specific AnalysisServerException

BAD_REQUEST instead of UNKNOWN

* Filter stack traces sent to UI

* build filtered stack trace directly from throwable

conveyal#918 (comment)

* Update to 2020 Census geometries

* Update seamless-census test fixtures

with 2020 Census geometries

* do not include stacktrace in message

rethrow AnalysisServerException since they already have clear messages

* Implement Timo Jaakkonen’s crossing delays (#4), upload package to DGL GitHub organisation

* Jaakkonen (2013)’s values

* float -> int delays

* let’s try this

* enough for today

* too many changes for such a small patch 😬

* fixed test

* shield against undefined date

* 😬

* am I allowed to push to DGL’s packages?

* linted, moved package destination to my own fork

* moved repo to fork

* build package on pull request, publish to dgl’s repo

* build package on pull request, publish to dgl’s repo

* build package on pull request, publish to dgl’s repo

* build package on pull request, publish to dgl’s repo

* build package on pull request, publish to dgl’s repo

* Run on older image, as ubuntu-latest has gradle-8, which fails (#5)

* Freeze gradle version (#6)

* Run on older image, as ubuntu-latest has gradle-8, which fails

* freeze gradle version

* fix build workflow  (#7)

* Run on older image, as ubuntu-latest has gradle-8, which fails

* freeze gradle version

* refactor workflow, correct repo package address

* fix build workflow (#8)

* Run on older image, as ubuntu-latest has gradle-8, which fails

* freeze gradle version

* refactor workflow, correct repo package address

* clean-up

* clean-up

* token

* print debug info

* configurable (per-repo) package registry (#9)

* Run on older image, as ubuntu-latest has gradle-8, which fails

* freeze gradle version

* refactor workflow, correct repo package address

* clean-up

* clean-up

* token

* print debug info

* flexible package repo address

* remove debug statement

* configurable package registry

* debugging GH actions

* debugging ii

* fix build system (#10)

* Run on older image, as ubuntu-latest has gradle-8, which fails

* freeze gradle version

* refactor workflow, correct repo package address

* clean-up

* clean-up

* token

* print debug info

* flexible package repo address

* remove debug statement

* configurable package registry

* debugging GH actions

* debugging ii

* asfd

* remove debug statements

* read custom OSM tags, use them for bike routing (#11)

* read custom OSM tags, use them for bike routing

* allocate memory

* write on correct edge

* Latest 7.x.x gradle

* Fix bicycle speed tag reading

* Allow publish step to fail (in pull requests)

* Add default speeds that represent the speed limits on Finnish roads (#13)

* revert Jaakkola implementation (#16)

* disable turning penalties for bicycle and car, do not apply perceived length multiplier for bicycle routing (#17)

* Use per-edge speeds for cycling (#19)

* Use per-edge speeds for cycling

* Timo Jakkola’s times (#21)

* save shapes of GTFS patterns (#22)

* 23 roll back jaakkola congestion timings (#24)

* save shapes of GTFS patterns

* Roll back Jaakkola congestion penalties

* make MAX_PATH_DESTINATIONS *not* final

---------

Co-authored-by: abyrd <[email protected]>
Co-authored-by: ansons <[email protected]>
Co-authored-by: Anson Stewart <[email protected]>
  • Loading branch information
4 people authored Jan 16, 2024
1 parent 7b9c2c3 commit caff253
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void respondToException(Exception e, Request request, Response response,
// Include a stack trace except when the error is known to be about unauthenticated or unauthorized access,
// in which case we don't want to leak information about the server to people scanning it for weaknesses.
if (type != UNAUTHORIZED && type != FORBIDDEN) {
body.put("stackTrace", errorEvent.stackTrace);
body.put("stackTrace", errorEvent.filteredStackTrace);
}
response.status(code);
response.type("application/json");
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
* the workers as well, but we'd have to be careful not to generate hundreds of messages at once.
*/
public class ErrorEvent extends Event {

// We may serialize this object, so we convert the Throwable to two strings to control its representation.
// All Events are intended to be eligible for serialization into a log or database, so we convert the Throwable to
// some Strings to determine its representation in a simple way.
// For flexibility in event handlers, it is tempting to hold on to the original Throwable instead of derived
// Strings. Exceptions are famously slow, but it's the initial creation and filling in the stack trace that are
// slow. Once the instace exists, repeatedly examining its stack trace should not be prohibitively costly. Still,
// we do probably gain some efficiency by converting the stack trace to a String once and reusing that.
// slow. Once the instance exists, repeatedly examining its stack trace should not be prohibitively costly.

public final String summary;

Expand All @@ -23,11 +25,16 @@ public class ErrorEvent extends Event {
*/
public final String httpPath;

/** The full stack trace of the exception that occurred. */
public final String stackTrace;

/** A minimal stack trace showing the immediate cause within Conveyal code. */
public final String filteredStackTrace;

public ErrorEvent (Throwable throwable, String httpPath) {
this.summary = ExceptionUtils.shortCauseString(throwable);
this.stackTrace = ExceptionUtils.stackTraceString(throwable);
this.filteredStackTrace = ExceptionUtils.filterStackTrace(throwable);
this.httpPath = httpPath;
}

Expand All @@ -54,25 +61,9 @@ public String traceWithContext (boolean verbose) {
if (verbose) {
builder.append(stackTrace);
} else {
builder.append(filterStackTrace(stackTrace));
builder.append(filteredStackTrace);
}
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.util.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,21 +91,27 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
this.job = job;
this.nOriginsTotal = job.nTasksTotal;
this.originsReceived = new BitSet(job.nTasksTotal);
// Check that origin and destination sets are not too big for generating CSV files.
if (!job.templateTask.makeTauiSite &&
job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)
) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = job.templateTask.destinationPointSets[0];
if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) {
if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS ||
destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS
) {
throw new AnalysisServerException(String.format(
"Freeform requests limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
// If results have been requested for freeform origins, check that the origin and
// destination pointsets are not too big for generating CSV files.
RegionalTask task = job.templateTask;
if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = task.destinationPointSets[0];
int nDestinations = destinationPointSet.featureCount();
int nODPairs = task.oneToOne ? nOriginsTotal : nOriginsTotal * nDestinations;
if (task.recordTimes &&
(nDestinations > MAX_FREEFORM_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Travel time results limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
if (task.includePathResults &&
(nDestinations > PathResult.MAX_PATH_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Path results limited to %d destinations and %d origin-destination pairs.",
PathResult.MAX_PATH_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
}

Expand Down Expand Up @@ -152,8 +160,11 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName);
}
}
} catch (AnalysisServerException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e));
// Handle any obscure problems we don't want end users to see without context of MultiOriginAssembler.
throw new RuntimeException("Exception while creating multi-origin assembler: " + e.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void load (ShapeDataStore store) throws Exception {
for (SimpleFeatureIterator it = sfc.features(); it.hasNext();) {
GeobufFeature feat = new GeobufFeature(it.next());
feat.id = null;
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID10"));
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID20"));
feat.properties = new HashMap<>();
store.add(feat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class TemporalDensityResult {

/**
* The temporal density of opportunities. For each destination set, for each percentile, for each minute of
* travel from 0 to 120, the number of opportunities reached in travel times from i (inclusive) to i+1 (exclusive).
* travel m from 0 to 119, the number of opportunities reached in travel times from m (inclusive) to m+1
* (exclusive).
*/
public final double[][][] opportunitiesPerMinute;

Expand All @@ -57,7 +58,7 @@ public void recordOneTarget (int target, int[] travelTimePercentilesSeconds) {
break; // If any percentile is unreached, all higher ones are also unreached.
}
int m = travelTimePercentilesSeconds[p] / 60;
if (m <= 120) {
if (m < 120) {
opportunitiesPerMinute[d][p][m] += dps.getOpportunityCount(target);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PathResult {
* These results are returned to the backend over an HTTP API so we don't want to risk making them too huge.
* This could be set to a higher number in cases where you know the result return channel can handle the size.
*/
public static int maxDestinations = 5000;
public static int MAX_PATH_DESTINATIONS = 5_000;

private final int nDestinations;
/**
Expand All @@ -49,7 +49,7 @@ public class PathResult {
public final Multimap<RouteSequence, Iteration>[] iterationsForPathTemplates;
private final TransitLayer transitLayer;

public static String[] DATA_COLUMNS = new String[]{
public static final String[] DATA_COLUMNS = new String[]{
"routes",
"boardStops",
"alightStops",
Expand All @@ -70,8 +70,8 @@ public PathResult(AnalysisWorkerTask task, TransitLayer transitLayer) {
// In regional analyses, return paths to all destinations
nDestinations = task.nTargetsPerOrigin();
// This limitation reflects the initial design, for use with freeform pointset destinations
if (nDestinations > maxDestinations) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + maxDestinations);
if (nDestinations > MAX_PATH_DESTINATIONS) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + MAX_PATH_DESTINATIONS);
}
}
iterationsForPathTemplates = new Multimap[nDestinations];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public RegionalWorkResult(OneOriginResult result, RegionalTask task) {
// TODO checkTravelTimeInvariants, checkAccessibilityInvariants to verify that values are monotonically increasing
}

/** Constructor used when results for this origin are considered unusable due to an unhandled error. */
/**
* Constructor used when results for this origin are considered unusable due to an unhandled error. Besides the
* short-form exception, most result fields are left null. There is no information to communicate, and because
* errors are often produced faster than valid results, we don't want to flood the backend with unnecessarily
* voluminous error reports. The short-form exception message is used for a similar reason, to limit the total size
* of error messages.
*/
public RegionalWorkResult(Throwable t, RegionalTask task) {
this.jobId = task.jobId;
this.taskId = task.taskId;
this.error = ExceptionUtils.shortAndLongString(t);
this.error = ExceptionUtils.filterStackTrace(t);
}

}
12 changes: 7 additions & 5 deletions src/main/java/com/conveyal/r5/kryo/KryoNetworkSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public abstract class KryoNetworkSerializer {
* the serialization format itself does not change. This will ensure newer workers will not load cached older files.
* We considered using an ISO date string as the version but that could get confusing when seen in filenames.
*
* History of Network Version (NV) changes:
* nv4 2023-11-02 WebMercatorGridPointSet now contains nested WebMercatorExtents
* nv3 2023-01-18 use Kryo 5 serialization format
* nv2 2022-04-05
* nv1 2021-04-30 stopped using r5 version string (which caused networks to be rebuilt for every new r5 version)
* History of Network Version (NV) changes (in production releases):
* nv3 since v7.0: switched to Kryo 5 serialization, WebMercatorGridPointSet now contains nested WebMercatorExtents
* nv2 since 2022-04-05
* nv1 since 2021-04-30: stopped rebuilding networks for every new r5 version, manually setting this version string
*
* When prototyping new features, use a unique identifier such as the branch or a commit ID, not sequential nvX ones.
* This avoids conflicts when multiple changes are combined in a single production release, or some are abandoned.
*/
public static final String NETWORK_FORMAT_VERSION = "nv3";

Expand Down
Loading

0 comments on commit caff253

Please sign in to comment.