diff --git a/build.gradle b/build.gradle index d93f0e90b..2302cd8f7 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ java { toolchain { languageVersion.set(JavaLanguageVersion.of(21)) } + withSourcesJar() } jar { @@ -164,7 +165,7 @@ dependencies { } // Database driver. - implementation 'org.mongodb:mongo-java-driver:3.11.0' + implementation 'org.mongodb:mongodb-driver-legacy:5.2.0' // Legacy system for storing Java objects, this functionality is now provided by the MongoDB driver itself. implementation 'org.mongojack:mongojack:2.10.1' diff --git a/src/main/java/com/conveyal/analysis/components/BackendComponents.java b/src/main/java/com/conveyal/analysis/components/BackendComponents.java index 9a7270a1b..dafb1b57e 100644 --- a/src/main/java/com/conveyal/analysis/components/BackendComponents.java +++ b/src/main/java/com/conveyal/analysis/components/BackendComponents.java @@ -86,7 +86,7 @@ public List standardHttpControllers () { new GtfsController(gtfsCache), new BundleController(this), new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database), - new RegionalAnalysisController(broker, fileStorage), + new RegionalAnalysisController(broker, fileStorage, taskScheduler), new AggregationAreaController(fileStorage, database, taskScheduler), // This broker controller registers at least one handler at URL paths beginning with /internal, which // is exempted from authentication and authorization, but should be hidden from the world diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 0500df8b4..5a45edf70 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -95,15 +95,14 @@ public interface Config { boolean testTaskRedelivery (); } - private Config config; + private final Config config; // Component Dependencies private final FileStorage fileStorage; private final EventBus eventBus; private final WorkerLauncher workerLauncher; - private final ListMultimap jobs = - MultimapBuilder.hashKeys().arrayListValues().build(); + private final ListMultimap jobs = MultimapBuilder.hashKeys().arrayListValues().build(); /** * The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should @@ -111,27 +110,35 @@ public interface Config { * is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other * workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete. * - * The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic + * The value should eventually be tuned. The value of 16 is the value used by the previous sporadic * polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work. + * + * NOTE that as a side effect this limits the total throughput of each worker to: + * MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second. + * It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water, + * deserts etc.) In this case the system may need to burn through millions of origins, only checking that they + * aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis. + * It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for + * disconnected origins or at least concisely signal large blocks of them in worker responses. */ - public final int MAX_TASKS_PER_WORKER = 16; + public static final int MAX_TASKS_PER_WORKER = 40; /** * Used when auto-starting spot instances. Set to a smaller value to increase the number of * workers requested automatically */ - public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800; - public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000; + public static final int TARGET_TASKS_PER_WORKER_TRANSIT = 800; + public static final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000; /** * We want to request spot instances to "boost" regional analyses after a few regional task * results are received for a given workerCategory. Do so after receiving results for an * arbitrary task toward the beginning of the job */ - public final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42; + public static final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42; /** The maximum number of spot instances allowable in an automatic request */ - public final int MAX_WORKERS_PER_CATEGORY = 250; + public static final int MAX_WORKERS_PER_CATEGORY = 250; /** * How long to give workers to start up (in ms) before assuming that they have started (and @@ -139,15 +146,11 @@ public interface Config { */ public static final long WORKER_STARTUP_TIME = 60 * 60 * 1000; - /** Keeps track of all the workers that have contacted this broker recently asking for work. */ private WorkerCatalog workerCatalog = new WorkerCatalog(); - /** - * These objects piece together results received from workers into one regional analysis result - * file per job. - */ - private static Map resultAssemblers = new HashMap<>(); + /** These objects piece together results received from workers into one regional analysis result file per job. */ + private Map resultAssemblers = new HashMap<>(); /** * keep track of which graphs we have launched workers on and how long ago we launched them, so diff --git a/src/main/java/com/conveyal/analysis/controllers/AggregationAreaController.java b/src/main/java/com/conveyal/analysis/controllers/AggregationAreaController.java index 286fa5593..fae1637b6 100644 --- a/src/main/java/com/conveyal/analysis/controllers/AggregationAreaController.java +++ b/src/main/java/com/conveyal/analysis/controllers/AggregationAreaController.java @@ -10,6 +10,7 @@ import com.conveyal.analysis.persistence.AnalysisDB; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; +import com.conveyal.file.UrlWithHumanName; import com.conveyal.r5.analyst.progress.Task; import com.fasterxml.jackson.databind.node.ObjectNode; import org.bson.conversions.Bson; @@ -27,6 +28,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; +import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; /** * Stores vector aggregationAreas (used to define the region of a weighted average accessibility metric). @@ -98,10 +100,10 @@ private Collection getAggregationAreas (Request req, Response r } /** Returns a JSON-wrapped URL for the mask grid of the aggregation area whose id matches the path parameter. */ - private ObjectNode getAggregationAreaGridUrl (Request req, Response res) { + private UrlWithHumanName getAggregationAreaGridUrl (Request req, Response res) { AggregationArea aggregationArea = aggregationAreaCollection.findPermittedByRequestParamId(req); - String url = fileStorage.getURL(aggregationArea.getStorageKey()); - return JsonUtil.objectNode().put("url", url); + res.type(APPLICATION_JSON.asString()); + return fileStorage.getJsonUrl(aggregationArea.getStorageKey(), aggregationArea.name, "grid"); } @Override diff --git a/src/main/java/com/conveyal/analysis/controllers/BundleController.java b/src/main/java/com/conveyal/analysis/controllers/BundleController.java index b7fc71cc5..b493cf618 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BundleController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BundleController.java @@ -13,14 +13,13 @@ import com.conveyal.file.FileUtils; import com.conveyal.gtfs.GTFSCache; import com.conveyal.gtfs.GTFSFeed; -import com.conveyal.gtfs.error.GTFSError; import com.conveyal.gtfs.error.GeneralError; import com.conveyal.gtfs.model.Stop; import com.conveyal.gtfs.validator.PostLoadValidator; import com.conveyal.osmlib.Node; import com.conveyal.osmlib.OSM; -import com.conveyal.r5.analyst.progress.ProgressInputStream; import com.conveyal.r5.analyst.cluster.TransportNetworkConfig; +import com.conveyal.r5.analyst.progress.ProgressInputStream; import com.conveyal.r5.analyst.progress.Task; import com.conveyal.r5.streets.OSMCache; import com.conveyal.r5.util.ExceptionUtils; @@ -81,6 +80,7 @@ public BundleController (BackendComponents components) { public void registerEndpoints (Service sparkService) { sparkService.path("/api/bundle", () -> { sparkService.get("", this::getBundles, toJson); + sparkService.get("/:_id/config", this::getBundleConfig, toJson); sparkService.get("/:_id", this::getBundle, toJson); sparkService.post("", this::create, toJson); sparkService.put("/:_id", this::update, toJson); @@ -110,7 +110,6 @@ private Bundle create (Request req, Response res) { try { bundle.name = files.get("bundleName").get(0).getString("UTF-8"); bundle.regionId = files.get("regionId").get(0).getString("UTF-8"); - if (files.get("osmId") != null) { bundle.osmId = files.get("osmId").get(0).getString("UTF-8"); Bundle bundleWithOsm = Persistence.bundles.find(QueryBuilder.start("osmId").is(bundle.osmId).get()).next(); @@ -118,7 +117,6 @@ private Bundle create (Request req, Response res) { throw AnalysisServerException.badRequest("Selected OSM does not exist."); } } - if (files.get("feedGroupId") != null) { bundle.feedGroupId = files.get("feedGroupId").get(0).getString("UTF-8"); Bundle bundleWithFeed = Persistence.bundles.find(QueryBuilder.start("feedGroupId").is(bundle.feedGroupId).get()).next(); @@ -135,6 +133,13 @@ private Bundle create (Request req, Response res) { bundle.feedsComplete = bundleWithFeed.feedsComplete; bundle.totalFeeds = bundleWithFeed.totalFeeds; } + if (files.get("config") != null) { + // Validation by deserializing into a model class instance. Unknown fields are ignored to + // allow sending config to custom or experimental workers with features unknown to the backend. + // The fields specifying OSM and GTFS IDs are not expected here. They will be ignored and overwritten. + String configString = files.get("config").get(0).getString(); + bundle.config = JsonUtil.objectMapper.readValue(configString, TransportNetworkConfig.class); + } UserPermissions userPermissions = UserPermissions.from(req); bundle.accessGroup = userPermissions.accessGroup; bundle.createdBy = userPermissions.email; @@ -274,15 +279,19 @@ private Bundle create (Request req, Response res) { return bundle; } + /** SIDE EFFECTS: This method will change the field bundle.config before writing it. */ private void writeNetworkConfigToCache (Bundle bundle) throws IOException { - TransportNetworkConfig networkConfig = new TransportNetworkConfig(); - networkConfig.osmId = bundle.osmId; - networkConfig.gtfsIds = bundle.feeds.stream().map(f -> f.bundleScopedFeedId).collect(Collectors.toList()); - + // If the user specified additional network configuration options, they should already be in bundle.config. + // If no custom options were specified, we start with a fresh, empty instance. + if (bundle.config == null) { + bundle.config = new TransportNetworkConfig(); + } + // This will overwrite and override any inconsistent osm and gtfs IDs that were mistakenly supplied by the user. + bundle.config.osmId = bundle.osmId; + bundle.config.gtfsIds = bundle.feeds.stream().map(f -> f.bundleScopedFeedId).collect(Collectors.toList()); String configFileName = bundle._id + ".json"; File configFile = FileUtils.createScratchFile("json"); - JsonUtil.objectMapper.writeValue(configFile, networkConfig); - + JsonUtil.objectMapper.writeValue(configFile, bundle.config); FileStorageKey key = new FileStorageKey(BUNDLES, configFileName); fileStorage.moveIntoStorage(key, configFile); } @@ -312,6 +321,31 @@ private Bundle getBundle (Request req, Response res) { return bundle; } + /** + * There are two copies of the Bundle/Network config: one in the Bundle entry in the database and one in a JSON + * file (obtainable by the workers). This method always reads the one in the file, which has been around longer + * and is considered the definitive source of truth. The entry in the database is a newer addition and has only + * been around since September 2024. + */ + private TransportNetworkConfig getBundleConfig(Request request, Response res) { + // Unfortunately this mimics logic in TransportNetworkCache. Deduplicate in a static utility method? + String id = GTFSCache.cleanId(request.params("_id")); + FileStorageKey key = new FileStorageKey(BUNDLES, id, "json"); + File networkConfigFile = fileStorage.getFile(key); + if (!networkConfigFile.exists()) { + throw AnalysisServerException.notFound("Bundle configuration file could not be found."); + } + + // Unlike in the worker, we expect the backend to have a model field for every known network/bundle option. + // Therefore, use the default objectMapper that does not tolerate unknown fields. + try { + return JsonUtil.objectMapper.readValue(networkConfigFile, TransportNetworkConfig.class); + } catch (Exception exception) { + LOG.error("Exception deserializing stored network config", exception); + return null; + } + } + private Collection getBundles (Request req, Response res) { return Persistence.bundles.findPermittedForQuery(req); } diff --git a/src/main/java/com/conveyal/analysis/controllers/LocalFilesController.java b/src/main/java/com/conveyal/analysis/controllers/LocalFilesController.java index c92fbbb33..a5ef44f74 100644 --- a/src/main/java/com/conveyal/analysis/controllers/LocalFilesController.java +++ b/src/main/java/com/conveyal/analysis/controllers/LocalFilesController.java @@ -33,7 +33,9 @@ private Object getFile (Request req, Response res) throws Exception { FileStorageKey key = new FileStorageKey(category, filename); File file = fileStorage.getFile(key); FileStorageFormat format = FileStorageFormat.fromFilename(filename); - res.type(format.mimeType); + if (format != null) { + res.type(format.mimeType); + } // If the content-encoding is set to gzip, Spark automatically gzips the response. This double-gzips anything // that was already gzipped. Some of our files are already gzipped, and we rely on the the client browser to diff --git a/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java b/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java index b1afcfc05..c28363e0a 100644 --- a/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java +++ b/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java @@ -18,6 +18,7 @@ import com.conveyal.file.FileStorageFormat; import com.conveyal.file.FileStorageKey; import com.conveyal.file.FileUtils; +import com.conveyal.file.UrlWithHumanName; import com.conveyal.r5.analyst.FreeFormPointSet; import com.conveyal.r5.analyst.Grid; import com.conveyal.r5.analyst.PointSet; @@ -61,6 +62,7 @@ import static com.conveyal.file.FileCategory.GRIDS; import static com.conveyal.r5.analyst.WebMercatorExtents.parseZoom; import static com.conveyal.r5.analyst.progress.WorkProductType.OPPORTUNITY_DATASET; +import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; /** * Controller that handles fetching opportunity datasets (grids and other pointset formats). @@ -94,10 +96,6 @@ public OpportunityDatasetController ( /** Store upload status objects FIXME trivial Javadoc */ private final List uploadStatuses = new ArrayList<>(); - private ObjectNode getJsonUrl (FileStorageKey key) { - return JsonUtil.objectNode().put("url", fileStorage.getURL(key)); - } - private void addStatusAndRemoveOldStatuses(OpportunityDatasetUploadStatus status) { uploadStatuses.add(status); LocalDateTime now = LocalDateTime.now(); @@ -113,10 +111,11 @@ private Collection getRegionDatasets(Request req, Response r ); } - private Object getOpportunityDataset(Request req, Response res) { + private UrlWithHumanName getOpportunityDataset(Request req, Response res) { OpportunityDataset dataset = Persistence.opportunityDatasets.findByIdFromRequestIfPermitted(req); if (dataset.format == FileStorageFormat.GRID) { - return getJsonUrl(dataset.getStorageKey()); + res.type(APPLICATION_JSON.asString()); + return fileStorage.getJsonUrl(dataset.getStorageKey(), dataset.sourceName + "_" + dataset.name, "grid"); } else { // Currently the UI can only visualize grids, not other kinds of datasets (freeform points). // We do generate a rasterized grid for each of the freeform pointsets we create, so ideally we'd redirect @@ -564,9 +563,10 @@ private List createGridsFromShapefile(List fileItems, * Respond to a request with a redirect to a downloadable file. * @param req should specify regionId, opportunityDatasetId, and an available download format (.tiff or .grid) */ - private Object downloadOpportunityDataset (Request req, Response res) throws IOException { + private UrlWithHumanName downloadOpportunityDataset (Request req, Response res) throws IOException { FileStorageFormat downloadFormat; String format = req.params("format"); + res.type(APPLICATION_JSON.asString()); try { downloadFormat = FileStorageFormat.valueOf(format.toUpperCase()); } catch (IllegalArgumentException iae) { @@ -576,38 +576,32 @@ private Object downloadOpportunityDataset (Request req, Response res) throws IOE String regionId = req.params("_id"); String gridKey = format; FileStorageKey storageKey = new FileStorageKey(GRIDS, String.format("%s/%s.grid", regionId, gridKey)); - return getJsonUrl(storageKey); + return fileStorage.getJsonUrl(storageKey, gridKey, "grid"); + } + if (FileStorageFormat.GRID.equals(downloadFormat)) { + return getOpportunityDataset(req, res); } - - if (FileStorageFormat.GRID.equals(downloadFormat)) return getOpportunityDataset(req, res); - final OpportunityDataset opportunityDataset = Persistence.opportunityDatasets.findByIdFromRequestIfPermitted(req); - FileStorageKey gridKey = opportunityDataset.getStorageKey(FileStorageFormat.GRID); FileStorageKey formatKey = opportunityDataset.getStorageKey(downloadFormat); - // if this grid is not on S3 in the requested format, try to get the .grid format if (!fileStorage.exists(gridKey)) { throw AnalysisServerException.notFound("Requested grid does not exist."); } - if (!fileStorage.exists(formatKey)) { // get the grid and convert it to the requested format File gridFile = fileStorage.getFile(gridKey); Grid grid = Grid.read(new GZIPInputStream(new FileInputStream(gridFile))); // closes input stream File localFile = FileUtils.createScratchFile(downloadFormat.toString()); FileOutputStream fos = new FileOutputStream(localFile); - if (FileStorageFormat.PNG.equals(downloadFormat)) { grid.writePng(fos); } else if (FileStorageFormat.GEOTIFF.equals(downloadFormat)) { grid.writeGeotiff(fos); } - fileStorage.moveIntoStorage(formatKey, localFile); } - - return getJsonUrl(formatKey); + return fileStorage.getJsonUrl(formatKey, opportunityDataset.sourceName + "_" + opportunityDataset.name, downloadFormat.extension); } /** diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index 32336fd7c..82806ad66 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -3,24 +3,28 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; +import com.conveyal.analysis.components.TaskScheduler; import com.conveyal.analysis.components.broker.Broker; import com.conveyal.analysis.components.broker.JobStatus; import com.conveyal.analysis.models.AnalysisRequest; +import com.conveyal.analysis.models.Model; import com.conveyal.analysis.models.OpportunityDataset; import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; import com.conveyal.file.FileStorageKey; import com.conveyal.file.FileUtils; +import com.conveyal.file.UrlWithHumanName; import com.conveyal.r5.analyst.FreeFormPointSet; import com.conveyal.r5.analyst.Grid; import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.PointSetCache; import com.conveyal.r5.analyst.cluster.RegionalTask; -import com.fasterxml.jackson.databind.JsonNode; +import com.conveyal.r5.analyst.progress.Task; import com.google.common.primitives.Ints; import com.mongodb.QueryBuilder; import gnu.trove.list.array.TIntArrayList; @@ -35,19 +39,35 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; import java.util.zip.GZIPOutputStream; import static com.conveyal.analysis.util.JsonUtil.toJson; import static com.conveyal.file.FileCategory.BUNDLES; import static com.conveyal.file.FileCategory.RESULTS; +import static com.conveyal.file.UrlWithHumanName.filenameCleanString; import static com.conveyal.r5.transit.TransportNetworkCache.getScenarioFilename; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; +import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML; +import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN; /** * Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching @@ -68,10 +88,12 @@ public class RegionalAnalysisController implements HttpController { private final Broker broker; private final FileStorage fileStorage; + private final TaskScheduler taskScheduler; - public RegionalAnalysisController (Broker broker, FileStorage fileStorage) { + public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) { this.broker = broker; this.fileStorage = fileStorage; + this.taskScheduler = taskScheduler; } private Collection getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) { @@ -157,27 +179,225 @@ private int getIntQueryParameter (Request req, String parameterName, int default } /** - * This used to extract a particular percentile of a regional analysis as a grid file. - * Now it just gets the single percentile that exists for any one analysis, either from the local buffer file - * for an analysis still in progress, or from S3 for a completed analysis. + * Associate a storage key with a human-readable name. + * Currently, this record type is only used within the RegionalAnalysisController class. */ - private Object getRegionalResults (Request req, Response res) throws IOException { + private record HumanKey(FileStorageKey storageKey, String humanName) { }; - // Get some path parameters out of the URL. - // The UUID of the regional analysis for which we want the output data + /** + * Get a regional analysis results raster for a single (percentile, cutoff, destination) combination, in one of + * several image file formats. This method was factored out for use from two different API endpoints, one for + * fetching a single grid, and another for fetching grids for all combinations of parameters at once. + * It returns the unique FileStorageKey for those results, associated with a non-unique human-readable name. + */ + private HumanKey getSingleCutoffGrid ( + RegionalAnalysis analysis, + OpportunityDataset destinations, + int cutoffMinutes, + int percentile, + FileStorageFormat fileFormat + ) throws IOException { + final String regionalAnalysisId = analysis._id; + final String destinationPointSetId = destinations._id; + // Selecting the zeroth cutoff still makes sense for older analyses that don't allow an array of N cutoffs. + int cutoffIndex = 0; + if (analysis.cutoffsMinutes != null) { + cutoffIndex = new TIntArrayList(analysis.cutoffsMinutes).indexOf(cutoffMinutes); + checkState(cutoffIndex >= 0); + } + LOG.info( + "Returning {} minute accessibility to pointset {} (percentile {}) for regional analysis {} in format {}.", + cutoffMinutes, destinationPointSetId, percentile, regionalAnalysisId, fileFormat + ); + // Analysis grids now have the percentile and cutoff in their S3 key, because there can be many of each. + // We do this even for results generated by older workers, so they will be re-extracted with the new name. + // These grids are reasonably small, we may be able to just send all cutoffs to the UI instead of selecting. + String singleCutoffKey = String.format( + "%s_%s_P%d_C%d.%s", + regionalAnalysisId, destinationPointSetId, percentile, cutoffMinutes, + fileFormat.extension.toLowerCase(Locale.ROOT) + ); + FileStorageKey singleCutoffFileStorageKey = new FileStorageKey(RESULTS, singleCutoffKey); + if (!fileStorage.exists(singleCutoffFileStorageKey)) { + // An accessibility grid for this particular cutoff has apparently never been extracted from the + // regional results file before. Extract one and save it for future reuse. Older regional analyses + // did not have arrays allowing multiple cutoffs, percentiles, or destination pointsets. The + // filenames of such regional accessibility results will not have a percentile or pointset ID. + // First try the newest form of regional results: multi-percentile, multi-destination-grid. + String multiCutoffKey = String.format("%s_%s_P%d.access", regionalAnalysisId, destinationPointSetId, percentile); + FileStorageKey multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); + if (!fileStorage.exists(multiCutoffFileStorageKey)) { + LOG.warn("Falling back to older file name formats for regional results file: " + multiCutoffKey); + // Fall back to second-oldest form: multi-percentile, single destination grid. + multiCutoffKey = String.format("%s_P%d.access", regionalAnalysisId, percentile); + multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); + if (fileStorage.exists(multiCutoffFileStorageKey)) { + checkArgument(analysis.destinationPointSetIds.length == 1); + } else { + // Fall back on oldest form of results, single-percentile, single-destination-grid. + multiCutoffKey = regionalAnalysisId + ".access"; + multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); + if (fileStorage.exists(multiCutoffFileStorageKey)) { + checkArgument(analysis.travelTimePercentiles.length == 1); + checkArgument(analysis.destinationPointSetIds.length == 1); + } else { + throw AnalysisServerException.notFound("Cannot find original source regional analysis output."); + } + } + } + LOG.debug("Single-cutoff grid {} not found on S3, deriving it from {}.", singleCutoffKey, multiCutoffKey); + + InputStream multiCutoffInputStream = new FileInputStream(fileStorage.getFile(multiCutoffFileStorageKey)); + Grid grid = new SelectingGridReducer(cutoffIndex).compute(multiCutoffInputStream); + + File localFile = FileUtils.createScratchFile(fileFormat.toString()); + FileOutputStream fos = new FileOutputStream(localFile); + + switch (fileFormat) { + case GRID: + grid.write(new GZIPOutputStream(fos)); + break; + case PNG: + grid.writePng(fos); + break; + case GEOTIFF: + grid.writeGeotiff(fos); + break; + } + LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey); + fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile); + LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey); + } + String analysisHumanName = humanNameForEntity(analysis); + String destinationHumanName = humanNameForEntity(destinations); + String resultHumanFilename = filenameCleanString( + String.format("%s_%s_P%d_C%d", analysisHumanName, destinationHumanName, percentile, cutoffMinutes) + ) + "." + fileFormat.extension.toLowerCase(Locale.ROOT); + // Note that the returned human filename already contains the appropriate extension. + return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename); + } + + // Prevent multiple requests from creating the same files in parallel. + // This could potentially be integrated into FileStorage with enum return values or an additional boolean method. + private Set filesBeingPrepared = Collections.synchronizedSet(new HashSet<>()); + + private Object getAllRegionalResults (Request req, Response res) throws IOException { final String regionalAnalysisId = req.params("_id"); - // The response file format: PNG, TIFF, or GRID - final String fileFormatExtension = req.params("format"); + final UserPermissions userPermissions = UserPermissions.from(req); + final RegionalAnalysis analysis = getAnalysis(regionalAnalysisId, userPermissions); + if (analysis.cutoffsMinutes == null || analysis.travelTimePercentiles == null || analysis.destinationPointSetIds == null) { + throw AnalysisServerException.badRequest("Batch result download is not available for legacy regional results."); + } + if (analysis.request.originPointSetKey != null) { + throw AnalysisServerException.badRequest("Batch result download only available for gridded origins."); + } + FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip"); + if (fileStorage.exists(zippedResultsKey)) { + res.type(APPLICATION_JSON.asString()); + String analysisHumanName = humanNameForEntity(analysis); + return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip"); + } + if (filesBeingPrepared.contains(zippedResultsKey.path)) { + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Geotiff zip is already being prepared in the background."; + } + // File did not exist. Create it in the background and ask caller to request it later. + filesBeingPrepared.add(zippedResultsKey.path); + Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name) + .forUser(userPermissions) + .withAction(progressListener -> { + int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length * + analysis.travelTimePercentiles.length * 2 + 1; + progressListener.beginTask("Creating and archiving geotiffs...", nSteps); + // Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination. + List humanKeys = new ArrayList<>(); + for (String destinationPointSetId : analysis.destinationPointSetIds) { + OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); + for (int cutoffMinutes : analysis.cutoffsMinutes) { + for (int percentile : analysis.travelTimePercentiles) { + HumanKey gridKey = getSingleCutoffGrid( + analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF + ); + humanKeys.add(gridKey); + progressListener.increment(); + } + } + } + File tempZipFile = File.createTempFile("regional", ".zip"); + // Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition + // Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual + // entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion. + tempZipFile.delete(); + Map env = Map.of("create", "true"); + URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath()); + try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) { + for (HumanKey key : humanKeys) { + Path storagePath = fileStorage.getFile(key.storageKey).toPath(); + Path zipPath = zipFilesystem.getPath(key.humanName); + Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING); + progressListener.increment(); + } + } + fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile); + progressListener.increment(); + filesBeingPrepared.remove(zippedResultsKey.path); + }); + taskScheduler.enqueue(task); + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Building geotiff zip in background."; + } + /** + * Given an Entity, make a human-readable name for the entity composed of its user-supplied name as well as + * the most rapidly changing digits of its ID to disambiguate in case multiple entities have the same name. + * It is also possible to find the exact entity in many web UI fields using this suffix of its ID. + */ + private static String humanNameForEntity (Model entity) { + // Most or all IDs encountered are MongoDB ObjectIDs. The first four and middle five bytes are slow-changing + // and would not disambiguate between data sets. Only the 3-byte counter at the end will be sure to change. + // See https://www.mongodb.com/docs/manual/reference/method/ObjectId/ + final String id = entity._id; + checkArgument(id.length() > 6, "ID had too few characters."); + String shortId = id.substring(id.length() - 6, id.length()); + String humanName = "%s_%s".formatted(filenameCleanString(entity.name), shortId); + return humanName; + } + + /** Fetch destination OpportunityDataset from database, followed by a check that it was present. */ + private static OpportunityDataset getDestinations (String destinationPointSetId, UserPermissions userPermissions) { + OpportunityDataset opportunityDataset = + Persistence.opportunityDatasets.findByIdIfPermitted(destinationPointSetId, userPermissions); + checkNotNull(opportunityDataset, "Opportunity dataset could not be found in database."); + return opportunityDataset; + } + + /** Fetch RegionalAnalysis from database by ID, followed by a check that it was present and not deleted. */ + private static RegionalAnalysis getAnalysis (String analysisId, UserPermissions userPermissions) { RegionalAnalysis analysis = Persistence.regionalAnalyses.findPermitted( - QueryBuilder.start("_id").is(req.params("_id")).get(), + QueryBuilder.start("_id").is(analysisId).get(), DBProjection.exclude("request.scenario.modifications"), - UserPermissions.from(req) + userPermissions ).iterator().next(); - if (analysis == null || analysis.deleted) { throw AnalysisServerException.notFound("The specified regional analysis is unknown or has been deleted."); } + return analysis; + } + + /** Extract a particular percentile and cutoff of a regional analysis in one of several different raster formats. */ + private UrlWithHumanName getRegionalResults (Request req, Response res) throws IOException { + // It is possible that regional analysis is complete, but UI is trying to fetch gridded results when there + // aren't any (only CSV, because origins are freeform). How should we determine whether this analysis is + // expected to have no gridded results and cleanly return a 404? + final String regionalAnalysisId = req.params("_id"); + FileStorageFormat format = FileStorageFormat.valueOf(req.params("format").toUpperCase()); + if (!FileStorageFormat.GRID.equals(format) && !FileStorageFormat.PNG.equals(format) && !FileStorageFormat.GEOTIFF.equals(format)) { + throw AnalysisServerException.badRequest("Format \"" + format + "\" is invalid. Request format must be \"grid\", \"png\", or \"geotiff\"."); + } + final UserPermissions userPermissions = UserPermissions.from(req); + RegionalAnalysis analysis = getAnalysis(regionalAnalysisId, userPermissions); // Which channel to extract from results with multiple values per origin (for different travel time cutoffs) // and multiple output files per analysis (for different percentiles of travel time and/or different @@ -187,7 +407,6 @@ private Object getRegionalResults (Request req, Response res) throws IOException // are coming from deprecated fields, are not meaningful and will be overwritten below from query parameters. int percentile = analysis.travelTimePercentile; int cutoffMinutes = analysis.cutoffMinutes; - int cutoffIndex = 0; String destinationPointSetId = analysis.grid; // Handle newer regional analyses with multiple cutoffs in an array. @@ -197,8 +416,7 @@ private Object getRegionalResults (Request req, Response res) throws IOException int nCutoffs = analysis.cutoffsMinutes.length; checkState(nCutoffs > 0, "Regional analysis has no cutoffs."); cutoffMinutes = getIntQueryParameter(req, "cutoff", analysis.cutoffsMinutes[nCutoffs / 2]); - cutoffIndex = new TIntArrayList(analysis.cutoffsMinutes).indexOf(cutoffMinutes); - checkState(cutoffIndex >= 0, + checkArgument(new TIntArrayList(analysis.cutoffsMinutes).contains(cutoffMinutes), "Travel time cutoff for this regional analysis must be taken from this list: (%s)", Ints.join(", ", analysis.cutoffsMinutes) ); @@ -228,89 +446,19 @@ private Object getRegionalResults (Request req, Response res) throws IOException "Destination gridId must be one of: %s", String.join(",", analysis.destinationPointSetIds)); } - // We started implementing the ability to retrieve and display partially completed analyses. // We eventually decided these should not be available here at the same endpoint as complete, immutable results. - if (broker.findJob(regionalAnalysisId) != null) { throw AnalysisServerException.notFound("Analysis is incomplete, no results file is available."); } - - // FIXME It is possible that regional analysis is complete, but UI is trying to fetch gridded results when there - // aren't any (only CSV, because origins are freeform). - // How can we determine whether this analysis is expected to have no gridded results and cleanly return a 404? - - // The analysis has already completed, results should be stored and retrieved from S3 via redirects. - LOG.debug("Returning {} minute accessibility to pointset {} (percentile {}) for regional analysis {}.", - cutoffMinutes, destinationPointSetId, percentile, regionalAnalysisId); - FileStorageFormat format = FileStorageFormat.valueOf(fileFormatExtension.toUpperCase()); - if (!FileStorageFormat.GRID.equals(format) && !FileStorageFormat.PNG.equals(format) && !FileStorageFormat.GEOTIFF.equals(format)) { - throw AnalysisServerException.badRequest("Format \"" + format + "\" is invalid. Request format must be \"grid\", \"png\", or \"tiff\"."); - } - - // Analysis grids now have the percentile and cutoff in their S3 key, because there can be many of each. - // We do this even for results generated by older workers, so they will be re-extracted with the new name. - // These grids are reasonably small, we may be able to just send all cutoffs to the UI instead of selecting. - String singleCutoffKey = - String.format("%s_%s_P%d_C%d.%s", regionalAnalysisId, destinationPointSetId, percentile, cutoffMinutes, fileFormatExtension); - - // A lot of overhead here - UI contacts backend, backend calls S3, backend responds to UI, UI contacts S3. - FileStorageKey singleCutoffFileStorageKey = new FileStorageKey(RESULTS, singleCutoffKey); - if (!fileStorage.exists(singleCutoffFileStorageKey)) { - // An accessibility grid for this particular cutoff has apparently never been extracted from the - // regional results file before. Extract one and save it for future reuse. Older regional analyses - // did not have arrays allowing multiple cutoffs, percentiles, or destination pointsets. The - // filenames of such regional accessibility results will not have a percentile or pointset ID. - // First try the newest form of regional results: multi-percentile, multi-destination-grid. - String multiCutoffKey = String.format("%s_%s_P%d.access", regionalAnalysisId, destinationPointSetId, percentile); - FileStorageKey multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); - if (!fileStorage.exists(multiCutoffFileStorageKey)) { - LOG.warn("Falling back to older file name formats for regional results file: " + multiCutoffKey); - // Fall back to second-oldest form: multi-percentile, single destination grid. - multiCutoffKey = String.format("%s_P%d.access", regionalAnalysisId, percentile); - multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); - if (fileStorage.exists(multiCutoffFileStorageKey)) { - checkArgument(analysis.destinationPointSetIds.length == 1); - } else { - // Fall back on oldest form of results, single-percentile, single-destination-grid. - multiCutoffKey = regionalAnalysisId + ".access"; - multiCutoffFileStorageKey = new FileStorageKey(RESULTS, multiCutoffKey); - if (fileStorage.exists(multiCutoffFileStorageKey)) { - checkArgument(analysis.travelTimePercentiles.length == 1); - checkArgument(analysis.destinationPointSetIds.length == 1); - } else { - throw AnalysisServerException.notFound("Cannot find original source regional analysis output."); - } - } - } - LOG.debug("Single-cutoff grid {} not found on S3, deriving it from {}.", singleCutoffKey, multiCutoffKey); - - InputStream multiCutoffInputStream = new FileInputStream(fileStorage.getFile(multiCutoffFileStorageKey)); - Grid grid = new SelectingGridReducer(cutoffIndex).compute(multiCutoffInputStream); - - File localFile = FileUtils.createScratchFile(format.toString()); - FileOutputStream fos = new FileOutputStream(localFile); - - switch (format) { - case GRID: - grid.write(new GZIPOutputStream(fos)); - break; - case PNG: - grid.writePng(fos); - break; - case GEOTIFF: - grid.writeGeotiff(fos); - break; - } - - fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile); - } - return JsonUtil.toJsonString( - JsonUtil.objectNode().put("url", fileStorage.getURL(singleCutoffFileStorageKey)) - ); + // Significant overhead here: UI contacts backend, backend calls S3, backend responds to UI, UI contacts S3. + OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); + HumanKey gridKey = getSingleCutoffGrid(analysis, destinations, cutoffMinutes, percentile, format); + res.type(APPLICATION_JSON.asString()); + return fileStorage.getJsonUrl(gridKey.storageKey, gridKey.humanName); } - private String getCsvResults (Request req, Response res) { + private Object getCsvResults (Request req, Response res) { final String regionalAnalysisId = req.params("_id"); final CsvResultType resultType = CsvResultType.valueOf(req.params("resultType").toUpperCase()); // If the resultType parameter received on the API is unrecognized, valueOf throws IllegalArgumentException @@ -332,7 +480,10 @@ private String getCsvResults (Request req, Response res) { FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, storageKey); - res.type("text/plain"); + // TODO handle JSON with human name on UI side + // res.type(APPLICATION_JSON.asString()); + // return fileStorage.getJsonUrl(fileStorageKey, analysis.name, resultType + ".csv"); + res.type(TEXT_HTML.asString()); return fileStorage.getURL(fileStorageKey); } @@ -526,17 +677,20 @@ private RegionalAnalysis updateRegionalAnalysis (Request request, Response respo * Return a JSON-wrapped URL for the file in FileStorage containing the JSON representation of the scenario for * the given regional analysis. */ - private JsonNode getScenarioJsonUrl (Request request, Response response) { - RegionalAnalysis regionalAnalysis = Persistence.regionalAnalyses - .findByIdIfPermitted(request.params("_id"), UserPermissions.from(request)); + private UrlWithHumanName getScenarioJsonUrl (Request request, Response response) { + RegionalAnalysis regionalAnalysis = Persistence.regionalAnalyses.findByIdIfPermitted( + request.params("_id"), + DBProjection.exclude("request.scenario.modifications"), + UserPermissions.from(request) + ); // In the persisted objects, regionalAnalysis.scenarioId seems to be null. Get it from the embedded request. final String networkId = regionalAnalysis.bundleId; final String scenarioId = regionalAnalysis.request.scenarioId; checkNotNull(networkId, "RegionalAnalysis did not contain a network ID."); checkNotNull(scenarioId, "RegionalAnalysis did not contain an embedded request with scenario ID."); - String scenarioUrl = fileStorage.getURL( - new FileStorageKey(BUNDLES, getScenarioFilename(regionalAnalysis.bundleId, scenarioId))); - return JsonUtil.objectNode().put("url", scenarioUrl); + FileStorageKey scenarioKey = new FileStorageKey(BUNDLES, getScenarioFilename(regionalAnalysis.bundleId, scenarioId)); + response.type(APPLICATION_JSON.asString()); + return fileStorage.getJsonUrl(scenarioKey, regionalAnalysis.name, "scenario.json"); } @Override @@ -546,11 +700,11 @@ public void registerEndpoints (spark.Service sparkService) { sparkService.get("/:regionId/regional/running", this::getRunningAnalyses, toJson); }); sparkService.path("/api/regional", () -> { - // For grids, no transformer is supplied: render raw bytes or input stream rather than transforming to JSON. sparkService.get("/:_id", this::getRegionalAnalysis); - sparkService.get("/:_id/grid/:format", this::getRegionalResults); + sparkService.get("/:_id/all", this::getAllRegionalResults, toJson); + sparkService.get("/:_id/grid/:format", this::getRegionalResults, toJson); sparkService.get("/:_id/csv/:resultType", this::getCsvResults); - sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl); + sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl, toJson); sparkService.delete("/:_id", this::deleteRegionalAnalysis, toJson); sparkService.post("", this::createRegionalAnalysis, toJson); sparkService.put("/:_id", this::updateRegionalAnalysis, toJson); diff --git a/src/main/java/com/conveyal/analysis/models/AnalysisRequest.java b/src/main/java/com/conveyal/analysis/models/AnalysisRequest.java index fbc02859b..6f51d2933 100644 --- a/src/main/java/com/conveyal/analysis/models/AnalysisRequest.java +++ b/src/main/java/com/conveyal/analysis/models/AnalysisRequest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -176,6 +177,14 @@ public class AnalysisRequest { */ public int dualAccessibilityThreshold = 0; + /** + * Freeform (untyped) flags for enabling experimental, undocumented, or arcane behavior in backend or workers. + * This should be used to replace all previous special behavior flags that were embedded inside analysis names etc. + */ + public Set flags; + + /** Control the details of CSV regional analysis output, including whether to output IDs, names, or both. */ + public CsvResultOptions csvResultOptions = new CsvResultOptions(); /** * Create the R5 `Scenario` from this request. @@ -283,6 +292,8 @@ public void populateTask (AnalysisWorkerTask task, UserPermissions userPermissio task.includeTemporalDensity = includeTemporalDensity; task.dualAccessibilityThreshold = dualAccessibilityThreshold; + task.flags = flags; + task.csvResultOptions = csvResultOptions; } private EnumSet getEnumSetFromString (String s) { diff --git a/src/main/java/com/conveyal/analysis/models/Bundle.java b/src/main/java/com/conveyal/analysis/models/Bundle.java index 912f066b4..4d5bef3e6 100644 --- a/src/main/java/com/conveyal/analysis/models/Bundle.java +++ b/src/main/java/com/conveyal/analysis/models/Bundle.java @@ -5,6 +5,7 @@ import com.conveyal.gtfs.error.GTFSError; import com.conveyal.gtfs.model.FeedInfo; import com.conveyal.gtfs.validator.model.Priority; +import com.conveyal.r5.analyst.cluster.TransportNetworkConfig; import com.fasterxml.jackson.annotation.JsonIgnore; import java.time.LocalDate; @@ -47,6 +48,11 @@ public class Bundle extends Model implements Cloneable { public int feedsComplete; public int totalFeeds; + // The definitive TransportNetworkConfig is a JSON file stored alongside the feeds in file storage. It is + // duplicated here to record any additional user-specified options that were supplied when the bundle was created. + // It may contain redundant copies of information stored in the outer level Bundle such as OSM and GTFS feed IDs. + public TransportNetworkConfig config; + public static String bundleScopeFeedId (String feedId, String feedGroupId) { return String.format("%s_%s", feedId, feedGroupId); } diff --git a/src/main/java/com/conveyal/analysis/models/CsvResultOptions.java b/src/main/java/com/conveyal/analysis/models/CsvResultOptions.java new file mode 100644 index 000000000..e925e5ff3 --- /dev/null +++ b/src/main/java/com/conveyal/analysis/models/CsvResultOptions.java @@ -0,0 +1,17 @@ +package com.conveyal.analysis.models; + +import com.conveyal.r5.transit.TransitLayer.EntityRepresentation; + +import static com.conveyal.r5.transit.TransitLayer.EntityRepresentation.ID_ONLY; + +/** + * API model type included in analysis requests to control details of CSV regional analysis output. + * This type is shared between AnalysisRequest (Frontend -> Broker) and AnalysisWorkerTask (Broker -> Workers). + * There is precedent for nested compound types shared across those top level request types (see DecayFunction). + */ +public class CsvResultOptions { + public EntityRepresentation routeRepresentation = ID_ONLY; + public EntityRepresentation stopRepresentation = ID_ONLY; + // Only feed ID representation is allowed to be null (no feed IDs at all, the default). + public EntityRepresentation feedRepresentation = null; +} diff --git a/src/main/java/com/conveyal/analysis/persistence/MongoMap.java b/src/main/java/com/conveyal/analysis/persistence/MongoMap.java index 6001a82d2..4f34353d1 100644 --- a/src/main/java/com/conveyal/analysis/persistence/MongoMap.java +++ b/src/main/java/com/conveyal/analysis/persistence/MongoMap.java @@ -43,12 +43,11 @@ public int size() { return (int) wrappedCollection.getCount(); } - public V findByIdFromRequestIfPermitted(Request request) { - return findByIdIfPermitted(request.params("_id"), UserPermissions.from(request)); - } - - public V findByIdIfPermitted(String id, UserPermissions userPermissions) { - V result = wrappedCollection.findOneById(id); + /** + * `fields` is nullable. + */ + public V findByIdIfPermitted(String id, DBObject fields, UserPermissions userPermissions) { + V result = wrappedCollection.findOneById(id, fields); if (result == null) { throw AnalysisServerException.notFound(String.format( @@ -61,6 +60,14 @@ public V findByIdIfPermitted(String id, UserPermissions userPermissions) { } } + public V findByIdIfPermitted(String id, UserPermissions userPermissions) { + return findByIdIfPermitted(id, null, userPermissions); + } + + public V findByIdFromRequestIfPermitted(Request request) { + return findByIdIfPermitted(request.params("_id"), UserPermissions.from(request)); + } + public V get(String key) { return wrappedCollection.findOneById(key); } diff --git a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java index df289c9fe..8bcf94d26 100644 --- a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java @@ -61,6 +61,7 @@ protected synchronized void finish (String fileName) throws IOException { // There's probably a more elegant way to do this with NIO and without closing the buffer. // That would be Files.copy(File.toPath(),X) or ByteStreams.copy. + // Perhaps better: we could wrap the output buffer in a gzip output stream and zip as we write out. InputStream is = new BufferedInputStream(new FileInputStream(bufferFile)); OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(gzippedResultFile))); ByteStreams.copy(is, os); diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index ca0cf09c5..1bd9ff9ac 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -100,6 +100,8 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception // CsvWriter is not threadsafe and multiple threads may call this, so after values are generated, // the actual writing is synchronized (TODO confirm) // Is result row generation slow enough to bother synchronizing only the following block? + // This first dimension check is specific to each subclass. The check in the loop below is more general, + // applying to all subclasses (after the subclass-specific rowValues method may have added some columns). checkDimension(workResult); Iterable rows = rowValues(workResult); synchronized (this) { diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 0dadb4337..6a7c9ffc7 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -42,6 +42,21 @@ public Iterable rowValues (RegionalWorkResult workResult) { return rows; } + // Around 2024-04 we wanted to expand the number of CSV columns and needed to update the dimension checks below. + // The number of columns is checked twice, once in this specific CsvResultWriter implementation and once in the + // abstract superclass. + // We don't want to introduce a column count check with tolerance that is applied separately to each row, because + // this will not catch a whole class of problems where the worker instances are not producing a consistent number + // of columns across origins. + // We do ideally want to allow experimental workers that add an unknown number of columns, but they should add those + // columns to every row. This requires some kind of negotiated, flexible protocol between the backend and workers. + // Or some system where the first worker response received sets expectations and all other responses must match. + // We thought this through and decided it was too big a change to introduce immediately. + // So we only accept one specific quantity of CSV columns, but fail with a very specific message when we see a + // number of CSV columns that we recognize as coming from an obsolete worker version. Breaking backward + // compatibility is acceptable here because CSV paths are still considered an experimental feature. + // Ideally this very case-specific check and error message will be removed when some more general system is added. + @Override protected void checkDimension (RegionalWorkResult workResult) { // Path CSV output only supports a single freeform pointset for now. @@ -53,6 +68,11 @@ protected void checkDimension (RegionalWorkResult workResult) { for (ArrayList oneDestination : workResult.pathResult) { // Number of distinct paths per destination is variable, don't validate it. for (String[] iterationDetails : oneDestination) { + if (iterationDetails.length == 10) { + throw new IllegalArgumentException( + "Please use worker version newer than v7.1. CSV columns in path results have changed." + ); + } checkDimension(workResult, "columns", iterationDetails.length, PathResult.DATA_COLUMNS.length); } } diff --git a/src/main/java/com/conveyal/file/FileStorage.java b/src/main/java/com/conveyal/file/FileStorage.java index 52de21316..f9bd687cc 100644 --- a/src/main/java/com/conveyal/file/FileStorage.java +++ b/src/main/java/com/conveyal/file/FileStorage.java @@ -94,4 +94,15 @@ default InputStream getInputStream (FileCategory fileCategory, String fileName) } } + default UrlWithHumanName getJsonUrl (FileStorageKey key, String rawHumanName, String humanExtension) { + String url = this.getURL(key); + return UrlWithHumanName.fromCleanedName(url, rawHumanName, humanExtension); + } + + /** This assumes the humanFileName is already a complete filename (cleaned and truncated with any extension). */ + default UrlWithHumanName getJsonUrl (FileStorageKey key, String humanFileName) { + String url = this.getURL(key); + return new UrlWithHumanName(url, humanFileName); + } + } diff --git a/src/main/java/com/conveyal/file/FileStorageFormat.java b/src/main/java/com/conveyal/file/FileStorageFormat.java index c33569de9..e3b6e0fe0 100644 --- a/src/main/java/com/conveyal/file/FileStorageFormat.java +++ b/src/main/java/com/conveyal/file/FileStorageFormat.java @@ -1,5 +1,7 @@ package com.conveyal.file; +import java.util.Locale; + /** * An enumeration of all the file types we handle as uploads, derived internal data, or work products. * Really this should be a union of several enumerated types (upload/internal/product) but Java does not allow this. @@ -37,7 +39,12 @@ public enum FileStorageFormat { } public static FileStorageFormat fromFilename (String filename) { - String extension = filename.substring(filename.lastIndexOf(".") + 1); - return FileStorageFormat.valueOf(extension.toUpperCase()); + String extension = filename.substring(filename.lastIndexOf(".") + 1).toLowerCase(Locale.ROOT); + for (FileStorageFormat format : FileStorageFormat.values()) { + if (format.extension.equals(extension)) { + return format; + } + } + return null; } } diff --git a/src/main/java/com/conveyal/file/UrlWithHumanName.java b/src/main/java/com/conveyal/file/UrlWithHumanName.java new file mode 100644 index 000000000..f7a49c933 --- /dev/null +++ b/src/main/java/com/conveyal/file/UrlWithHumanName.java @@ -0,0 +1,43 @@ +package com.conveyal.file; + +/** + * Combines a url for downloading a file, which might include a globally unique but human-annoying UUID, with a + * suggested human-readable name for that file when saved by an end user. The humanName may not be globally unique, + * so is only appropriate for cases where it doesn't need to be machine discoverable using a UUID. The humanName can + * be used as the download attribute of an HTML link, or as the attachment name in a content-disposition header. + * Instances of this record are intended to be serialized to JSON as an HTTP API response. + */ +public class UrlWithHumanName { + public final String url; + public final String humanName; + + public UrlWithHumanName (String url, String humanName) { + this.url = url; + this.humanName = humanName; + } + + private static final int TRUNCATE_FILENAME_CHARS = 220; + + /** + * Given an arbitrary string, make it safe for use in a friendly human-readable filename. This can yield non-unique + * strings and is intended for files downloaded by end users that do not need to be machine-discoverable by unique + * IDs. A length of up to 255 characters will work with most filesystems and within ZIP files. In all names we + * generate, the end of the name more uniquely identifies it (contains a fragment of a hex object ID or contains + * the distinguishing factors such as cutoff and percentile for files within a ZIP archive). Therefore, we truncate + * to a suffix rather than a prefix when the name is too long. We keep the length somewhat under 255 in case some + * other short suffix needs to be appended before use as a filename. + * Note that this will strip dot characters out of the string, so any dot and extension must be suffixed later. + */ + public static String filenameCleanString (String original) { + String ret = original.replaceAll("\\W+", "_"); + if (ret.length() > TRUNCATE_FILENAME_CHARS) { + ret = ret.substring(ret.length() - TRUNCATE_FILENAME_CHARS, ret.length()); + } + return ret; + } + + public static UrlWithHumanName fromCleanedName (String url, String rawHumanName, String humanExtension) { + String humanName = UrlWithHumanName.filenameCleanString(rawHumanName) + "." + humanExtension; + return new UrlWithHumanName(url, humanName); + } +} diff --git a/src/main/java/com/conveyal/r5/analyst/LinkageCache.java b/src/main/java/com/conveyal/r5/analyst/LinkageCache.java index 0bc7b7738..da006156e 100644 --- a/src/main/java/com/conveyal/r5/analyst/LinkageCache.java +++ b/src/main/java/com/conveyal/r5/analyst/LinkageCache.java @@ -16,35 +16,34 @@ import java.util.concurrent.ExecutionException; /** - * Retains linkages between PointSets and the StreetLayers for specific StreetModes. - * This used to be embedded in the PointSets themselves, now there should be one instance per TransportNetwork. + * Retains linkages between PointSets and the StreetLayers for specific StreetModes, including egress distance tables. + * LinkageCaches used to be associated with individual PointSets, but now there is a single cache per TransportNetwork. * There could instead be one instance per AnalystWorker or per JVM (static), but this would cause the mappings * including PointSets, StreetLayers, and Linkages (which hold references to the TransportNetwork) to stick around - * even when we try to garbage collect a TransportNetwork. This is less of an issue now that we don't plan to have - * workers migrate between TransportNetworks. + * even when we try to garbage collect a TransportNetwork. In cloud operation, this problem would not necessarily arise + * in practice since workers are permanently associated with a single base TransportNetwork. */ public class LinkageCache { private static final Logger LOG = LoggerFactory.getLogger(LinkageCache.class); /** - * Maximum number of street network linkages to cache per PointSet. This is a crude way of limiting memory - * consumption, and should eventually be replaced with a WeighingCache. Since every Scenario including the - * baseline has its own StreetLayer instance now, this means we can hold walk, bike, and car linkages (with - * distance tables) for 2 scenarios plus the baseline at once. - * FIXME this used to be per-PointSet, now it's one single limit per TransportNetwork. + * Maximum number of street network linkages and associated egress tables to retain in this LinkageCache. + * This is a crude way of limiting memory consumption, and would ideally be replaced with a WeighingCache. + * However, the memory consumption of a particular linkage is difficult to quantify, as the bulk of the data + * is distance tables, and multiple linkages may share a large number of references to reused distance tables. + * Since every Scenario including the baseline has its own StreetLayer instance, we could for example hold linkages + * (with associated distance tables) for walk, bike, and car egress for 2 scenarios plus the baseline at once. */ public static int LINKAGE_CACHE_SIZE = 9; /** - * When this PointSet is connected to the street network, the resulting data are cached in this Map to speed up - * later reuse. Different linkages are produced for different street networks and for different on-street modes - * of travel. At first we were careful to key this cache on the StreetNetwork itself (rather than the - * TransportNetwork or Scenario) to ensure that linkages were re-used for multiple scenarios that have the same - * street network. However, selectively re-linking to the street network is now usually fast, and - * StreetNetworks must be copied for every scenario due to references to their containing TransportNetwork. + * For a particular TransportNetwork, a different linkage is produced for each unique combination of destination + * points, StreetLayer, and on-street mode of travel (see details of Key). A distinct StreetLayer instance exists + * for each scenario even when its contents remain unchanged by the scenario, because the StreetLayer references + * the enclosing TransportNetwork for the scenario. * Note that this cache will be serialized with the PointSet, but serializing a Guava cache only serializes the - * cache instance and its settings, not the contents of the cache. We consider this sane behavior. + * cache instance and its settings, not the contents of the cache. This is the intended behavior. */ protected transient LoadingCache linkageCache; @@ -59,24 +58,24 @@ public class LinkageCache { /** * The logic for lazy-loading linkages into the cache. * - // FIXME FIXME clean up these notes on sub-linkages. - // We know that pointSet is a WebMercatorGridPointSet, but if it's a new one we want to replicate its - // linkages based on the base scenarioNetwork.gridPointSet's linkages. We don't know if it's new, so such - // logic has to happen in the loop below over all streetModes, where we fetch and build the egress cost - // tables. We already know for sure this is a scenarioNetwork. - // So if we see a linkage for scenarioNetwork.gridPointSet, we need to add another linkage. - // When this mapping exists: - // (scenarioNetwork.gridPointSet, StreetLayer, StreetMode) -> linkageFor(scenarioNetwork.gridPointSet) - // We need to generate this mapping: - // (pointSet, StreetLayer, StreetMode) -> new LinkedPointSet(linkageFor(scenarioNetwork.gridPointSet), pointSet); - // Note that: ((WebMercatorGridPointSet)pointSet).base == scenarioNetwork.gridPointSet - // I'm afraid BaseLinkage means two different things here: we can subset bigger linkages that already - // exist, or we can redo subsets of linkages of the same size them them when applying scenarios. - // Yes: in one situation, the PointSet objects are identical when making the new linkage, but the - // streetLayer differs. In the other situation, the PointSet objects are different but the other aspects - // are the same. Again this is the difference between a PointSet and its linkage. We should call them - // PointSetLinkages instead of LinkedPointSets because they do not subclass PointSet. - // basePointSet vs. baseStreetLayer vs. baseLinkage. + * FIXME clean up these notes on sub-linkages, some of which may be obsolete. + * We know that pointSet is a WebMercatorGridPointSet, but if it's a new one we want to replicate its + * linkages based on the base scenarioNetwork.gridPointSet's linkages. We don't know if it's new, so such + * logic has to happen in the loop below over all streetModes, where we fetch and build the egress cost + * tables. We already know for sure this is a scenarioNetwork. + * So if we see a linkage for scenarioNetwork.gridPointSet, we need to add another linkage. + * When this mapping exists: + * (scenarioNetwork.gridPointSet, StreetLayer, StreetMode) -> linkageFor(scenarioNetwork.gridPointSet) + * We need to generate this mapping: + * (pointSet, StreetLayer, StreetMode) -> new LinkedPointSet(linkageFor(scenarioNetwork.gridPointSet), pointSet); + * Note that: ((WebMercatorGridPointSet)pointSet).base == scenarioNetwork.gridPointSet + * I'm afraid BaseLinkage means two different things here: we can subset bigger linkages that already + * exist, or we can redo subsets of linkages of the same size when applying scenarios. + * Yes: in one situation, the PointSet objects are identical when making the new linkage, but the + * streetLayer differs. In the other situation, the PointSet objects are different but the other aspects + * are the same. Again this is the difference between a PointSet and its linkage. We should call them + * PointSetLinkages instead of LinkedPointSets because they do not subclass PointSet. + * basePointSet vs. baseStreetLayer vs. baseLinkage. */ private class LinkageCacheLoader extends CacheLoader implements Serializable { @Override diff --git a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java index 943ae7a8e..e3683fd36 100644 --- a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java +++ b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java @@ -88,8 +88,8 @@ public LoaderState preloadData (AnalysisWorkerTask task) { /** * A blocking way to ensure the network and all linkages and precomputed tables are prepared in advance of routing. - * Note that this does not perform any blocking or locking of its own - any synchronization will be that of the - * underlying caches (synchronized methods on TransportNetworkCache or LinkedPointSet). It also bypasses the + * Note that this does not perform any blocking or locking of its own. Any synchronization or turn-taking will be + * that of the underlying caches (TransportNetworkCache or LinkageCache). It also bypasses the * AsyncLoader locking that would usually allow only one buildValue operation at a time. All threads that call with * similar tasks will make interleaved calls to setProgress (with superficial map synchronization). Other than * causing a value to briefly revert from PRESENT to BUILDING this doesn't seem deeply problematic. @@ -110,11 +110,13 @@ protected TransportNetwork buildValue(Key key) { // Get the set of points to which we are measuring travel time. Any smaller sub-grids created here will // reference the scenarioNetwork's built-in full-extent pointset, so can reuse its linkage. - // TODO handle multiple destination grids. + // FIXME handle multiple destination grids. if (key.destinationGridExtents == null) { // Special (and ideally temporary) case for regional freeform destinations, where there is no grid to link. // The null destinationGridExtents are created by the WebMercatorExtents#forPointsets else clause. + // FIXME there is no grid to link, but there are points and egress tables to make! + // see com.conveyal.r5.analyst.cluster.AnalysisWorkerTask.loadAndValidateDestinationPointSets return scenarioNetwork; } diff --git a/src/main/java/com/conveyal/r5/analyst/TravelTimeComputer.java b/src/main/java/com/conveyal/r5/analyst/TravelTimeComputer.java index 6af5dbcf7..0237a636e 100644 --- a/src/main/java/com/conveyal/r5/analyst/TravelTimeComputer.java +++ b/src/main/java/com/conveyal/r5/analyst/TravelTimeComputer.java @@ -167,14 +167,16 @@ public OneOriginResult computeTravelTimes() { // The generalized cost calculations currently increment time and weight by the same amount. sr.quantityToMinimize = StreetRouter.State.RoutingVariable.DURATION_SECONDS; sr.route(); - // Change to walking in order to reach transit stops in pedestrian-only areas like train stations. - // This implies you are dropped off or have a very easy parking spot for your vehicle. - // This kind of multi-stage search should also be used when building egress distance cost tables. - if (accessMode != StreetMode.WALK) { - sr.keepRoutingOnFoot(); - } if (request.hasTransit()) { + // Change to walking in order to reach transit stops in pedestrian-only areas like train stations. + // This implies you are dropped off or have a very easy parking spot for your vehicle. + // This kind of multi-stage search should also be used when building egress distance cost tables. + // Note that this can take up to twice as long as the initial car/bike search. Do it only when the + // walking is necessary, and when the radius of the car/bike search is limited, as for transit access. + if (accessMode != StreetMode.WALK) { + sr.keepRoutingOnFoot(); + } // Find access times to transit stops, keeping the minimum across all access street modes. // Note that getReachedStops() returns the routing variable units, not necessarily seconds. // TODO add logic here if linkedStops are specified in pickupDelay? diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index 2add61702..d8e89dfd9 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -458,10 +458,15 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable { } // Pull all necessary inputs into cache in a blocking fashion, unlike single-point tasks where prep is async. - // Avoids auto-shutdown while preloading. Must be done after loading destination pointsets to establish extents. - // Note we're completely bypassing the async loader here and relying on the older nested LoadingCaches. - // If those are ever removed, the async loader will need a synchronous mode with per-path blocking (kind of - // reinventing the wheel of LoadingCache) or we'll need to make preparation for regional tasks async. + // This is because single point tasks return fast to report progress, while regional tasks currently do not. + // Worker auto-shutdown time should remain very high during this blocking preload step. Destination point sets + // must already be loaded to establish extents before the preloading step begins. Note that we're still using + // the NetworkPreloader which is an AsyncLoader, but calling a method that intentionally skips all the async or + // background proccessing machinery. Usually, N RegionalTasks all try to preload at once, and all block on + // this method. Redundant slow calculation is not prevented by the AsyncLoader class itself, but by the other + // LoadingCaches behind it. Specifically, the TransportNetworkCache and LinkageCache enforce turn-taking and + // prevent redundant work. If those are ever removed, we would need either async regional task preparation, or + // a synchronous mode with per-key blocking on AsyncLoader (kind of reinventing the wheel of LoadingCache). TransportNetwork transportNetwork = networkPreloader.synchronousPreload(task); // If we are generating a static site, there must be a single metadata file for an entire batch of results. diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorkerTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorkerTask.java index 30266f171..2698905fa 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorkerTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorkerTask.java @@ -1,5 +1,6 @@ package com.conveyal.r5.analyst.cluster; +import com.conveyal.analysis.models.CsvResultOptions; import com.conveyal.r5.analyst.FreeFormPointSet; import com.conveyal.r5.analyst.Grid; import com.conveyal.r5.analyst.GridTransformWrapper; @@ -15,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.util.Arrays; +import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -177,6 +179,15 @@ public abstract class AnalysisWorkerTask extends ProfileRequest { */ public ChaosParameters injectFault; + /** + * Freeform (untyped) flags for enabling experimental, undocumented, or arcane worker behavior. + * This should be used to replace all previous special behavior flags that were embedded inside analysis names etc. + */ + public Set flags; + + /** Control the details of CSV regional analysis output, including whether to output IDs, names, or both. */ + public CsvResultOptions csvResultOptions; + /** * Is this a single point or regional request? Needed to encode types in JSON serialization. Can that type field be * added automatically with a serializer annotation instead of by defining a getter method and two dummy methods? diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java b/src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java index c2366ac1a..43e617920 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java @@ -1,5 +1,6 @@ package com.conveyal.r5.analyst.cluster; +import com.conveyal.analysis.models.CsvResultOptions; import com.conveyal.r5.analyst.StreetTimesAndModes; import com.conveyal.r5.transit.TransitLayer; import com.conveyal.r5.transit.path.Path; @@ -47,19 +48,24 @@ public class PathResult { * With additional changes, patterns could be collapsed further to route combinations or modes. */ public final Multimap[] iterationsForPathTemplates; + private final TransitLayer transitLayer; + private final CsvResultOptions csvOptions; + public static final String[] DATA_COLUMNS = new String[]{ "routes", "boardStops", "alightStops", + "feedIds", "rideTimes", "accessTime", "egressTime", "transferTime", "waitTimes", "totalTime", - "nIterations" + "nIterations", + "group" }; public PathResult(AnalysisWorkerTask task, TransitLayer transitLayer) { @@ -76,6 +82,7 @@ public PathResult(AnalysisWorkerTask task, TransitLayer transitLayer) { } iterationsForPathTemplates = new Multimap[nDestinations]; this.transitLayer = transitLayer; + this.csvOptions = task.csvResultOptions; } /** @@ -108,7 +115,7 @@ public ArrayList[] summarizeIterations(Stat stat) { int nIterations = iterations.size(); checkState(nIterations > 0, "A path was stored without any iterations"); String waits = null, transfer = null, totalTime = null; - String[] path = routeSequence.detailsWithGtfsIds(transitLayer); + String[] path = routeSequence.detailsWithGtfsIds(transitLayer, csvOptions); double targetValue; IntStream totalWaits = iterations.stream().mapToInt(i -> i.waitTimes.sum()); if (stat == Stat.MINIMUM) { @@ -135,7 +142,10 @@ public ArrayList[] summarizeIterations(Stat stat) { score = thisScore; } } - String[] row = ArrayUtils.addAll(path, transfer, waits, totalTime, String.valueOf(nIterations)); + String group = ""; // Reserved for future use + String[] row = ArrayUtils.addAll( + path, transfer, waits, totalTime, String.valueOf(nIterations), group + ); checkState(row.length == DATA_COLUMNS.length); summary[d].add(row); } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java index 0a444f9aa..839bdde91 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java @@ -74,7 +74,7 @@ public Type getType() { */ @Override public WebMercatorExtents getWebMercatorExtents() { - if (makeTauiSite) { + if (makeTauiSite || this.hasFlag("CROP_DESTINATIONS")) { return WebMercatorExtents.forTask(this); } else { return WebMercatorExtents.forPointsets(this.destinationPointSets); @@ -112,4 +112,8 @@ public int nTargetsPerOrigin () { } } + public boolean hasFlag (String flag) { + return this.flags != null && this.flags.contains(flag); + } + } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java index d9e6789a8..d253e4b6b 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java @@ -26,14 +26,14 @@ * scenarios from the backend instead of from S3. * * TODO merge this with TransportNetworkCache#resolveScenario into a single multi-level mem/disk/s3 cache. - * Note that this cache is going to just grow indefinitely in size as a worker receives many iterations of the same - * scenario - that could be a memory leak. Again multi level caching could releive those worries. - * It's debatable whether we should be hanging on to scenarios passed with single point requests becuase they may never - * be used again. + * This cache grows in size without bound as a worker receives many iterations of the same scenario. + * This is technically a sort of memory leak for long-lived workers. Multi-level caching could relieve those worries. + * However, this cache stores only the Scenarios and Modifications, not any large egress tables or linkages. + * + * It's debatable whether we should be hanging on to scenarios passed with single point requests, + * because they may never be used again. * Should we just always require a single point task to be sent to the cluster before a regional? * That would not ensure the scenario was present on all workers though. - * - * Created by abyrd on 2018-10-29 */ public class ScenarioCache { @@ -44,7 +44,7 @@ public class ScenarioCache { public synchronized void storeScenario (Scenario scenario) { Scenario existingScenario = scenariosById.put(scenario.id, scenario); if (existingScenario != null) { - LOG.debug("Scenario cache already contained a this scenario."); + LOG.debug("Scenario cache already contained this scenario."); } } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/TransportNetworkConfig.java b/src/main/java/com/conveyal/r5/analyst/cluster/TransportNetworkConfig.java index 012e3204a..387742634 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/TransportNetworkConfig.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/TransportNetworkConfig.java @@ -2,12 +2,8 @@ import com.conveyal.r5.analyst.fare.InRoutingFareCalculator; import com.conveyal.r5.analyst.scenario.Modification; -import com.conveyal.r5.analyst.scenario.RasterCost; -import com.conveyal.r5.analyst.scenario.ShapefileLts; import com.conveyal.r5.profile.StreetMode; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.List; import java.util.Set; @@ -54,4 +50,11 @@ public class TransportNetworkConfig { */ public Set buildGridsForModes; + /** + * Specifies which "labeler" to use when setting traversal mode permissions from OSM tags. For now, only + * implemented with "sidewalk" to use the SidewalkTraversalPermissionLayer. This should eventually be cleaned up + * (specifying different labelers, using enums). + */ + public String traversalPermissionLabeler; + } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 1fcd17a6e..9b6d54632 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -39,7 +39,6 @@ public class WorkerStatus { public String workerVersion; public String workerId; public Set networks = new HashSet<>(); - public Set scenarios = new HashSet<>(); public double secondsSinceLastPoll; public Map tasksPerMinuteByJobId; @JsonUnwrapped(prefix = "ec2") @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) { // networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds(); // For now we report a single network, even before it's loaded. networks = Sets.newHashSet(worker.networkId); - scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios(); ec2 = worker.ec2info; OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); diff --git a/src/main/java/com/conveyal/r5/analyst/progress/Task.java b/src/main/java/com/conveyal/r5/analyst/progress/Task.java index 49990af7a..a072d234d 100644 --- a/src/main/java/com/conveyal/r5/analyst/progress/Task.java +++ b/src/main/java/com/conveyal/r5/analyst/progress/Task.java @@ -162,7 +162,7 @@ protected void bubbleUpProgress() { } /** - * Check that all necesary fields have been set before enqueueing for execution, and check any invariants. + * Check that all necessary fields have been set before enqueueing for execution, and check any invariants. */ public void validate () { if (this.user == null) { diff --git a/src/main/java/com/conveyal/r5/labeling/SidewalkTraversalPermissionLabeler.java b/src/main/java/com/conveyal/r5/labeling/SidewalkTraversalPermissionLabeler.java new file mode 100644 index 000000000..34d2c8465 --- /dev/null +++ b/src/main/java/com/conveyal/r5/labeling/SidewalkTraversalPermissionLabeler.java @@ -0,0 +1,35 @@ +package com.conveyal.r5.labeling; + +import com.conveyal.osmlib.Way; +import com.conveyal.r5.streets.EdgeStore; + +/** + * Traversal permission labeler that restricts walking on most driving ways (useful for networks with complete + * sidewalks). Also includes permissions for the United States (see USTraversalPermissionLabeler). + */ +public class SidewalkTraversalPermissionLabeler extends TraversalPermissionLabeler { + static { + addPermissions("pedestrian", "bicycle=yes"); + addPermissions("bridleway", "bicycle=yes;foot=yes"); //horse=yes but we don't support horse + addPermissions("cycleway", "bicycle=yes;foot=yes"); + addPermissions("trunk|primary|secondary|tertiary|unclassified|residential|living_street|road|service|track", + "access=yes"); + } + + @Override + public RoadPermission getPermissions(Way way) { + RoadPermission rp = super.getPermissions(way); + if (rp.forward.contains(EdgeStore.EdgeFlag.ALLOWS_CAR) || + rp.forward.contains(EdgeStore.EdgeFlag.NO_THRU_TRAFFIC_CAR) || + rp.backward.contains(EdgeStore.EdgeFlag.ALLOWS_CAR) || + rp.backward.contains(EdgeStore.EdgeFlag.NO_THRU_TRAFFIC_CAR) + ) { + rp.forward.remove(EdgeStore.EdgeFlag.ALLOWS_PEDESTRIAN); + rp.forward.remove(EdgeStore.EdgeFlag.NO_THRU_TRAFFIC_PEDESTRIAN); + rp.backward.remove(EdgeStore.EdgeFlag.ALLOWS_PEDESTRIAN); + rp.backward.remove(EdgeStore.EdgeFlag.NO_THRU_TRAFFIC_PEDESTRIAN); + } + return rp; + } + +} diff --git a/src/main/java/com/conveyal/r5/streets/StreetLayer.java b/src/main/java/com/conveyal/r5/streets/StreetLayer.java index 3d781b41b..4e31481ee 100644 --- a/src/main/java/com/conveyal/r5/streets/StreetLayer.java +++ b/src/main/java/com/conveyal/r5/streets/StreetLayer.java @@ -6,12 +6,14 @@ import com.conveyal.osmlib.OSMEntity; import com.conveyal.osmlib.Relation; import com.conveyal.osmlib.Way; +import com.conveyal.r5.analyst.cluster.TransportNetworkConfig; import com.conveyal.r5.analyst.scenario.PickupWaitTimes; import com.conveyal.r5.api.util.BikeRentalStation; import com.conveyal.r5.api.util.ParkRideParking; import com.conveyal.r5.common.GeometryUtils; import com.conveyal.r5.labeling.LevelOfTrafficStressLabeler; import com.conveyal.r5.labeling.RoadPermission; +import com.conveyal.r5.labeling.SidewalkTraversalPermissionLabeler; import com.conveyal.r5.labeling.SpeedLabeler; import com.conveyal.r5.labeling.StreetClass; import com.conveyal.r5.labeling.TraversalPermissionLabeler; @@ -132,9 +134,9 @@ public class StreetLayer implements Serializable, Cloneable { public TIntObjectMap parkRideLocationsMap; // TODO these are only needed when building the network, should we really be keeping them here in the layer? - // We should instead have a network builder that holds references to this transient state. - // TODO don't hardwire to US - private transient TraversalPermissionLabeler permissionLabeler = new USTraversalPermissionLabeler(); + // We should instead have a network builder that holds references to this transient state. Note initial + // approach of specifying a TraversalPermissionLabeler in TransportNetworkConfig. + private transient TraversalPermissionLabeler permissionLabeler; private transient LevelOfTrafficStressLabeler stressLabeler = new LevelOfTrafficStressLabeler(); private transient TypeOfEdgeLabeler typeOfEdgeLabeler = new TypeOfEdgeLabeler(); private transient SpeedLabeler speedLabeler; @@ -207,6 +209,22 @@ public class StreetLayer implements Serializable, Cloneable { public StreetLayer() { speedLabeler = new SpeedLabeler(SpeedConfig.defaultConfig()); + permissionLabeler = new USTraversalPermissionLabeler(); + } + + public StreetLayer(TransportNetworkConfig config) { + this(); + if (config != null) { + permissionLabeler = switch (config.traversalPermissionLabeler) { + case "sidewalk" -> new SidewalkTraversalPermissionLabeler(); + case null -> new USTraversalPermissionLabeler(); + default -> throw new IllegalArgumentException( + "Unknown traversal permission labeler: " + config.traversalPermissionLabeler + ); + }; + } else { + permissionLabeler = new USTraversalPermissionLabeler(); + } } /** Load street layer from an OSM-lib OSM DB */ diff --git a/src/main/java/com/conveyal/r5/streets/StreetRouter.java b/src/main/java/com/conveyal/r5/streets/StreetRouter.java index fa3750e35..2fa4b24e5 100644 --- a/src/main/java/com/conveyal/r5/streets/StreetRouter.java +++ b/src/main/java/com/conveyal/r5/streets/StreetRouter.java @@ -742,7 +742,7 @@ public int getTravelTimeToVertex (int vertexIndex) { * fragments from the vertices at either end of the edge up to the destination split point. * If no states can be produced return null. * - * Note that this is only used by the point to point street router, not by LinkedPointSets (which have equivalent + * NOTE that this is ONLY USED BY the point to point street router, NOT BY LinkedPointSets (which have equivalent * logic in their eval method). The PointSet implementation only needs to produce times, not States. But ideally * some common logic can be factored out. */ diff --git a/src/main/java/com/conveyal/r5/transit/TransitLayer.java b/src/main/java/com/conveyal/r5/transit/TransitLayer.java index 871491ff2..f978d5a5e 100644 --- a/src/main/java/com/conveyal/r5/transit/TransitLayer.java +++ b/src/main/java/com/conveyal/r5/transit/TransitLayer.java @@ -54,6 +54,9 @@ import java.util.stream.IntStream; import java.util.stream.StreamSupport; +import static com.conveyal.r5.transit.TransitLayer.EntityRepresentation.ID_ONLY; +import static com.conveyal.r5.transit.TransitLayer.EntityRepresentation.NAME_ONLY; + /** * A key simplifying factor is that we don't handle overnight trips. This is fine for analysis at usual times of day. @@ -815,31 +818,66 @@ public TIntSet findStopsInGeometry (Geometry geometry) { return stops; } + public enum EntityRepresentation { + ID_ONLY, NAME_ONLY, NAME_AND_ID + } + /** * For the given pattern index, returns the GTFS routeId. If includeName is true, the returned string will * also include a route_short_name or route_long_name (if they are not null). */ - public String routeString(int routeIndex, boolean includeName) { + public String routeString (int routeIndex, EntityRepresentation nameOrId) { RouteInfo routeInfo = routes.get(routeIndex); - String route = routeInfo.route_id; - if (includeName) { - if (routeInfo.route_short_name != null) { - route += " (" + routeInfo.route_short_name + ")"; - } else if (routeInfo.route_long_name != null){ - route += " (" + routeInfo.route_long_name + ")"; + String name = routeInfo.route_short_name; + String id = routeInfo.route_id; + // If we might actually use the name, check some fallbacks. + if (nameOrId != ID_ONLY) { + if (name == null) { + name = routeInfo.route_long_name; + } + if (name == null) { + name = routeInfo.route_id; } } - return route; + return switch (nameOrId) { + case NAME_ONLY -> name; + case NAME_AND_ID -> name + " (" + id + ")"; + default -> id; + }; } /** * For the given stop index, returns the GTFS stopId (stripped of R5's feedId prefix) and, if includeName is true, * stopName. */ - public String stopString(int stopIndex, boolean includeName) { - // TODO use a compact feed index, instead of splitting to remove feedIds - String stop = stopIdForIndex.get(stopIndex) == null ? "[new]" : stopIdForIndex.get(stopIndex).split(":")[1]; - if (includeName) stop += " (" + stopNames.get(stopIndex) + ")"; - return stop; + public String stopString(int stopIndex, EntityRepresentation nameOrId) { + String stopId = stopIdForIndex.get(stopIndex); + String stopName = stopNames.get(stopIndex); + // I'd trust the JVM JIT to optimize out these assignments on different code paths, but not the split call. + if (nameOrId != NAME_ONLY) { + if (stopId == null) { + stopId = "[new]"; + } else { + // TODO use a compact feed ID instead of splitting to remove feedIds (or put feedId into another CSV field) + stopId = stopId.split(":")[1]; + } + } + if (nameOrId != ID_ONLY) { + if (stopName == null) { + stopName = "[new]"; + } + } + return switch (nameOrId) { + case NAME_ONLY -> stopName; + case NAME_AND_ID -> stopName + " (" + stopId + ")"; + default -> stopId; + }; + } + + /** + * For a supplied stopIndex in the transit layer, return the feed id (which we prepend to the GTFS stop id). + */ + public String feedFromStop(int stopIndex) { + return stopIdForIndex.get(stopIndex) == null ? "[new]" : stopIdForIndex.get(stopIndex).split(":")[0]; } } diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java index 3e3cb3720..4d351eedf 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java @@ -4,9 +4,11 @@ import com.conveyal.osmlib.OSM; import com.conveyal.r5.analyst.LinkageCache; import com.conveyal.r5.analyst.WebMercatorGridPointSet; +import com.conveyal.r5.analyst.cluster.TransportNetworkConfig; import com.conveyal.r5.analyst.error.TaskError; import com.conveyal.r5.analyst.fare.InRoutingFareCalculator; import com.conveyal.r5.analyst.scenario.Scenario; +import com.conveyal.r5.common.JsonUtilities; import com.conveyal.r5.kryo.KryoNetworkSerializer; import com.conveyal.r5.profile.StreetMode; import com.conveyal.r5.streets.StreetLayer; @@ -53,14 +55,10 @@ public class TransportNetwork implements Serializable { public TransitLayer transitLayer; /** - * This stores any number of lightweight scenario networks built upon the current base network. - * FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited. - * A single network cache at the top level could store base networks and scenarios since they all have globally - * unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated - * base network, which keeps the references in the scenarios from holding on to the base network. But considering - * that we have never started evicting networks (other than for a "cache" of one element) this might be getting - * ahead of ourselves. + * This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to + * avoid any inadvertent incompatibilities with serialized network files or serialization library settings. */ + @Deprecated public transient Map scenarios = new HashMap<>(); /** @@ -81,8 +79,6 @@ public class TransportNetwork implements Serializable { */ public String scenarioId = null; - public static final String BUILDER_CONFIG_FILENAME = "build-config.json"; - public InRoutingFareCalculator fareCalculator; /** Non-fatal warnings encountered when applying the scenario, null on a base network */ @@ -100,7 +96,9 @@ public void rebuildTransientIndexes() { streetLayer.indexStreets(); transitLayer.rebuildTransientIndexes(); } - + public static TransportNetwork fromFiles (String osmSourceFile, List gtfsSourceFiles) { + return fromFiles(osmSourceFile, gtfsSourceFiles, null); + } /** * OSM PBF files are fragments of a single global database with a single namespace. Therefore it is valid to load * more than one PBF file into a single OSM storage object. However they might be from different points in time, so @@ -114,9 +112,11 @@ public void rebuildTransientIndexes() { * NOTE the feedId of the gtfs feeds loaded here will be the ones declared by the feeds or based on their filenames. * This method makes no effort to impose the more unique feed IDs created by the Analysis backend. */ + public static TransportNetwork fromFiles ( String osmSourceFile, - List gtfsSourceFiles + List gtfsSourceFiles, + String configFile ) throws DuplicateFeedException { // Load OSM data into MapDB to pass into network builder. OSM osm = new OSM(osmSourceFile + ".mapdb"); @@ -124,21 +124,37 @@ public static TransportNetwork fromFiles ( osm.readFromFile(osmSourceFile); // Supply feeds with a stream so they do not sit open in memory while other feeds are being processed. Stream feeds = gtfsSourceFiles.stream().map(GTFSFeed::readOnlyTempFileFromGtfs); - return fromInputs(osm, feeds); + if (configFile == null) { + return fromInputs(osm, feeds); + } else { + try { + // Use lenient mapper to mimic behavior in objectFromRequestBody. + TransportNetworkConfig config = JsonUtilities.lenientObjectMapper.readValue(configFile, + TransportNetworkConfig.class); + return fromInputs(osm, feeds, config); + } catch (IOException e) { + throw new RuntimeException("Error reading TransportNetworkConfig. Does it contain new unrecognized fields?", e); + } + } + } + + public static TransportNetwork fromInputs (OSM osm, Stream gtfsFeeds) { + return fromInputs(osm, gtfsFeeds, null); } /** - * This is the core method for building a street and transit network. It takes osm-lib and gtfs-lib objects as - * parameters. It is wrapped in various other methods that create those OSM and GTFS objects from filenames, input - * directories etc. The supplied OSM object must have intersections already detected. - * The GTFS feeds are supplied as a stream so that they can be loaded one by one on demand. + * This is the method for building a street and transit network locally (as opposed to + * TransportNetworkCache#buildNetworkfromConfig, which is used in cluster builds). This method takes osm-lib, + * gtfs-lib, and config objects as parameters. It is wrapped in various other methods that create those OSM and + * GTFS objects from filenames, input directories etc. The supplied OSM object must have intersections already + * detected. The GTFS feeds are supplied as a stream so that they can be loaded one by one on demand. */ - public static TransportNetwork fromInputs (OSM osm, Stream gtfsFeeds) { + public static TransportNetwork fromInputs (OSM osm, Stream gtfsFeeds, TransportNetworkConfig config) { // Create a transport network to hold the street and transit layers TransportNetwork transportNetwork = new TransportNetwork(); // Make street layer from OSM data in MapDB - StreetLayer streetLayer = new StreetLayer(); + StreetLayer streetLayer = new StreetLayer(config); transportNetwork.streetLayer = streetLayer; streetLayer.parentNetwork = transportNetwork; streetLayer.loadFromOsm(osm); @@ -180,14 +196,16 @@ public static TransportNetwork fromInputs (OSM osm, Stream gtfsFeeds) } /** - * Scan a directory detecting all the files that are network inputs, then build a network from those files. + * Scan a directory detecting all the files that are network inputs, then build a network from those files. This + * method is used in the PointToPointRouterServer, not the cluster-based analysis backend. * - * NOTE the feedId of the gtfs feeds laoded here will be the ones declared by the feeds or based on their filenames. + * NOTE the feedId of the gtfs feeds loaded here will be the ones declared by the feeds or based on their filenames. * This method makes no effort to impose the more unique feed IDs created by the Analysis backend. */ public static TransportNetwork fromDirectory (File directory) throws DuplicateFeedException { File osmFile = null; List gtfsFiles = new ArrayList<>(); + File configFile = null; for (File file : directory.listFiles()) { switch (InputFileType.forFile(file)) { case GTFS: @@ -202,6 +220,9 @@ public static TransportNetwork fromDirectory (File directory) throws DuplicateFe LOG.warn("Can only load one OSM file at a time."); } break; + case CONFIG: + LOG.info("Found config file {}", file); + configFile = file; case DEM: LOG.warn("DEM file '{}' not yet supported.", file); break; @@ -213,7 +234,11 @@ public static TransportNetwork fromDirectory (File directory) throws DuplicateFe LOG.error("An OSM PBF file is required to build a network."); return null; } else { - return fromFiles(osmFile.getAbsolutePath(), gtfsFiles); + if (configFile == null) { + return fromFiles(osmFile.getAbsolutePath(), gtfsFiles); + } else { + return fromFiles(osmFile.getAbsolutePath(), gtfsFiles, configFile.getAbsolutePath()); + } } } @@ -259,6 +284,7 @@ public static InputFileType forFile(File file) { if (name.endsWith(".pbf") || name.endsWith(".vex")) return OSM; if (name.endsWith(".tif") || name.endsWith(".tiff")) return DEM; // Digital elevation model (elevation raster) if (name.endsWith("network.dat")) return OUTPUT; + if (name.endsWith(".json")) return CONFIG; return OTHER; } } diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java index b7f1f1d8f..c67fe9d24 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java @@ -25,17 +25,9 @@ import javax.annotation.Nonnull; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; import static com.conveyal.file.FileCategory.BUNDLES; import static com.conveyal.file.FileCategory.DATASOURCES; @@ -53,10 +45,20 @@ public class TransportNetworkCache implements Component { private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class); /** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */ - private static final int DEFAULT_CACHE_SIZE = 1; + private static final int MAX_CACHED_NETWORKS = 1; + + /** + * It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases + * (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on + * a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so + * caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory. + * But in practice, in non-local (cloud) operation a given worker instance is locked to a single network for its + * entire lifespan. + */ + public static final int MAX_CACHED_SCENARIO_NETWORKS = 10; // TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache. - private final LoadingCache cache; + private final LoadingCache networkCache; private final FileStorage fileStorage; private final GTFSCache gtfsCache; @@ -64,15 +66,39 @@ public class TransportNetworkCache implements Component { /** * A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace - * scenarios with only their IDs, and reverse that replacement later. + * scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects + * themselves, not the TransportNetworks built from those Scenarios. */ private final ScenarioCache scenarioCache = new ScenarioCache(); + /** + * This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios. + * Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need + * to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID. + */ + private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { } + + /** + * This stores a number of lightweight scenario networks built upon the current base network. + * Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own + * EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios. + * The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the + * associated base network, which keeps the references in the scenarios from holding on to the base network. + * But considering that we have never started evicting networks (other than for a "cache" of one element) this + * eviction can be handled in other ways. + */ + private LoadingCache scenarioNetworkCache; + /** Create a transport network cache. If source bucket is null, will work offline. */ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) { this.osmCache = osmCache; this.gtfsCache = gtfsCache; - this.cache = createCache(DEFAULT_CACHE_SIZE); + this.networkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_NETWORKS) + .build(this::loadNetwork); + this.scenarioNetworkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_SCENARIO_NETWORKS) + .build(this::loadScenario); this.fileStorage = fileStorage; } @@ -80,10 +106,9 @@ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMC * Find a transport network by ID, building or loading as needed from pre-existing OSM, GTFS, MapDB, or Kryo files. * This should never return null. If a TransportNetwork can't be built or loaded, an exception will be thrown. */ - public synchronized @Nonnull - TransportNetwork getNetwork (String networkId) throws TransportNetworkException { + public TransportNetwork getNetwork (String networkId) throws TransportNetworkException { try { - return cache.get(networkId); + return networkCache.get(networkId); } catch (Exception e) { throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e); } @@ -107,43 +132,35 @@ public void rememberScenario (Scenario scenario) { * base graphs). Therefore we can look up cached scenario networks based solely on their scenarioId rather than a * compound key of (networkId, scenarioId). * - * The fact that scenario networks are cached means that PointSet linkages will be automatically reused. + * Reusing scenario networks automatically leads to reuse of the associated PointSet linkages and egress tables. * TODO it seems to me that this method should just take a Scenario as its second parameter, and that resolving * the scenario against caches on S3 or local disk should be pulled out into a separate function. * The problem is that then you resolve the scenario every time, even when the ID is enough to look up the already * built network. So we need to pass the whole task in here, so either the ID or full scenario are visible. * - * Thread safety notes: This entire method is synchronized so access by multiple threads will be sequential. - * The first thread will have a chance to build and store the requested scenario before any others see it. - * This means each new scenario will be applied one after the other. This is probably OK as long as building egress - * tables is already parallelized. + * Thread safety: getNetwork and getNetworkForScenario are threadsafe caches, so access to the same key by multiple + * threads will occur sequentially without repeatedly or simultaneously performing the same loading actions. + * Javadoc on the Caffeine LoadingCache indicates that it will throw exceptions when the cache loader method throws + * them, without establishing a mapping in the cache. So exceptions occurring during scenario application are + * expected to bubble up unimpeded. */ - public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) { - // If the networkId is different than previous calls, a new network will be loaded. Its transient nested map - // of scenarios will be empty at first. This ensures it's initialized if null. - // FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork. - TransportNetwork baseNetwork = this.getNetwork(networkId); - if (baseNetwork.scenarios == null) { - baseNetwork.scenarios = new HashMap<>(); - } + public TransportNetwork getNetworkForScenario (String networkId, String scenarioId) { + TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId)); + return scenarioNetwork; + } - TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId); - if (scenarioNetwork == null) { - // The network for this scenario was not found in the cache. Create that scenario network and cache it. - LOG.debug("Applying scenario to base network..."); - // Fetch the full scenario if an ID was specified. - Scenario scenario = resolveScenario(networkId, scenarioId); - // Apply any scenario modifications to the network before use, performing protective copies where necessary. - // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. - // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use - // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always - // apply scenarios every time. - scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); - LOG.debug("Done applying scenario. Caching the resulting network."); - baseNetwork.scenarios.put(scenario.id, scenarioNetwork); - } else { - LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId); - } + private TransportNetwork loadScenario (BaseAndScenarioId ids) { + TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId()); + LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it."); + // Fetch the full scenario if an ID was specified. + Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId()); + // Apply any scenario modifications to the network before use, performing protective copies where necessary. + // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. + // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use + // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always + // apply scenarios every time. + TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); + LOG.debug("Done applying scenario. Caching the resulting network."); return scenarioNetwork; } @@ -168,6 +185,8 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) { File configFile = fileStorage.getFile(configFileKey); try { // Use lenient mapper to mimic behavior in objectFromRequestBody. + // A single network configuration file might be used across several worker versions. Unknown field names + // may be present for other worker versions unknown to this one. So we can't strictly validate field names. return JsonUtilities.lenientObjectMapper.readValue(configFile, TransportNetworkConfig.class); } catch (IOException e) { throw new RuntimeException("Error reading TransportNetworkConfig. Does it contain new unrecognized fields?", e); @@ -183,9 +202,8 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) { TransportNetworkConfig networkConfig = loadNetworkConfig(networkId); if (networkConfig == null) { // The switch to use JSON manifests instead of zips occurred in 32a1aebe in July 2016. - // Over six years have passed, buildNetworkFromBundleZip is deprecated and could probably be removed. - LOG.warn("No network config (aka manifest) found. Assuming old-format network inputs bundle stored as a single ZIP file."); - network = buildNetworkFromBundleZip(networkId); + // buildNetworkFromBundleZip was deprecated for years then removed in 2024. + throw new RuntimeException("No network config (aka manifest) found."); } else { network = buildNetworkFromConfig(networkConfig); } @@ -218,70 +236,19 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) { return network; } - /** Build a transport network given a network ID, using a zip of all bundle files in S3. */ - @Deprecated - private TransportNetwork buildNetworkFromBundleZip (String networkId) { - // The location of the inputs that will be used to build this graph - File dataDirectory = FileUtils.createScratchDirectory(); - FileStorageKey zipKey = new FileStorageKey(BUNDLES, networkId + ".zip"); - File zipFile = fileStorage.getFile(zipKey); - - try { - ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile)); - ZipEntry entry; - while ((entry = zis.getNextEntry()) != null) { - File entryDestination = new File(dataDirectory, entry.getName()); - if (!entryDestination.toPath().normalize().startsWith(dataDirectory.toPath())) { - throw new Exception("Bad zip entry"); - } - - // Are both these mkdirs calls necessary? - entryDestination.getParentFile().mkdirs(); - if (entry.isDirectory()) - entryDestination.mkdirs(); - else { - OutputStream entryFileOut = new FileOutputStream(entryDestination); - zis.transferTo(entryFileOut); - entryFileOut.close(); - } - } - zis.close(); - } catch (Exception e) { - // TODO delete cache dir which is probably corrupted. - LOG.warn("Error retrieving transportation network input files", e); - return null; - } - - // Now we have a local copy of these graph inputs. Make a graph out of them. - TransportNetwork network; - try { - network = TransportNetwork.fromDirectory(dataDirectory); - } catch (DuplicateFeedException e) { - LOG.error("Duplicate feeds in transport network {}", networkId, e); - throw new RuntimeException(e); - } - - // Set the ID on the network and its layers to allow caching linkages and analysis results. - network.scenarioId = networkId; - - return network; - } - /** * Build a network from a JSON TransportNetworkConfig in file storage. * This describes the locations of files used to create a bundle, as well as options applied at network build time. * It contains the unique IDs of the GTFS feeds and OSM extract. */ private TransportNetwork buildNetworkFromConfig (TransportNetworkConfig config) { - // FIXME duplicate code. All internal building logic should be encapsulated in a method like - // TransportNetwork.build(osm, gtfs1, gtfs2...) - // We currently have multiple copies of it, in buildNetworkFromConfig and buildNetworkFromBundleZip so you've - // got to remember to do certain things like set the network ID of the network in multiple places in the code. - // Maybe we should just completely deprecate bundle ZIPs and remove those code paths. + // FIXME All internal building logic should be encapsulated in a method like TransportNetwork.build(osm, + // gtfs1, gtfs2...) (see various methods in TransportNetwork). TransportNetwork network = new TransportNetwork(); - network.streetLayer = new StreetLayer(); + network.streetLayer = new StreetLayer(config); + network.streetLayer.loadFromOsm(osmCache.get(config.osmId)); network.streetLayer.parentNetwork = network; @@ -355,12 +322,6 @@ private String getNetworkConfigFilename (String networkId) { return GTFSCache.cleanId(networkId) + ".json"; } - private LoadingCache createCache(int size) { - return Caffeine.newBuilder() - .maximumSize(size) - .build(this::loadNetwork); - } - /** * CacheLoader method, which should only be called by the LoadingCache. * Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote @@ -392,28 +353,6 @@ private LoadingCache createCache(int size) { } } - /** - * This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about - * what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that - * workers are started up with no networks loaded, but with the intent for them to work on a particular job. So - * currently the workers just report which network they were started up for, and this method is not used. - * - * In the future, workers should just report an empty set of loaded networks, and the back end should strategically - * send them tasks when they come on line to assign them to networks as needed. But this will require a new - * mechanism to fairly allocate the workers to jobs. - */ - public Set getLoadedNetworkIds() { - return cache.asMap().keySet(); - } - - public Set getAppliedScenarios() { - return cache.asMap().values().stream() - .filter(network -> network.scenarios != null) - .map(network -> network.scenarios.keySet()) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - /** * Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3). */ diff --git a/src/main/java/com/conveyal/r5/transit/TripPattern.java b/src/main/java/com/conveyal/r5/transit/TripPattern.java index 7c7e08224..522fa8907 100644 --- a/src/main/java/com/conveyal/r5/transit/TripPattern.java +++ b/src/main/java/com/conveyal/r5/transit/TripPattern.java @@ -34,6 +34,7 @@ public class TripPattern implements Serializable, Cloneable { * ID in this transport network the ID would depend on the order of application of scenarios, and because this ID is * used to map results back to the original network. * TODO This concept of an "original" transport network may be obsolete, this field doesn't seem to be used anywhere. + * These are set to sequential integers: the index of the pattern in the TransitLayer's list of patterns. */ public int originalId; diff --git a/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java b/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java index 6ed2eb73c..455e1fab1 100644 --- a/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java +++ b/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java @@ -1,6 +1,8 @@ package com.conveyal.r5.transit.path; +import com.conveyal.analysis.models.CsvResultOptions; import com.conveyal.r5.transit.TransitLayer; +import com.conveyal.r5.transit.TransitLayer.EntityRepresentation; import gnu.trove.list.TIntList; import gnu.trove.list.array.TIntArrayList; @@ -9,6 +11,8 @@ import java.util.Objects; import java.util.StringJoiner; +import static com.conveyal.r5.transit.TransitLayer.EntityRepresentation.NAME_AND_ID; + /** A door-to-door path that includes the routes ridden between stops */ public class RouteSequence { @@ -27,25 +31,39 @@ public RouteSequence(PatternSequence patternSequence, TransitLayer transitLayer) } } - /** Returns details summarizing this route sequence, using GTFS ids stored in the supplied transitLayer. */ - public String[] detailsWithGtfsIds(TransitLayer transitLayer){ - StringJoiner routeIds = new StringJoiner("|"); - StringJoiner boardStopIds = new StringJoiner("|"); - StringJoiner alightStopIds = new StringJoiner("|"); - StringJoiner rideTimes = new StringJoiner("|"); + /** + * Returns details summarizing this route sequence, using GTFS ids stored in the supplied transitLayer. + * @param csvOptions indicates whether names or IDs should be returned for certain fields. + * @return array of pipe-concatenated strings, with the route, board stop, alight stop, ride time, and feed for + * each transit leg, as well as the access and egress time. + * + * If csvOptions.feedRepresentation is not null, the feed values will be R5-generated UUID for boarding stop of + * each leg. We are grabbing the feed ID from the stop rather than the route (which might seem like a better + * representative of the leg) because stops happen to have a readily available feed ID. + */ + public String[] detailsWithGtfsIds (TransitLayer transitLayer, CsvResultOptions csvOptions){ + StringJoiner routeJoiner = new StringJoiner("|"); + StringJoiner boardStopJoiner = new StringJoiner("|"); + StringJoiner alightStopJoiner = new StringJoiner("|"); + StringJoiner feedJoiner = new StringJoiner("|"); + StringJoiner rideTimeJoiner = new StringJoiner("|"); for (int i = 0; i < routes.size(); i++) { - routeIds.add(transitLayer.routeString(routes.get(i), false)); - boardStopIds.add(transitLayer.stopString(stopSequence.boardStops.get(i), false)); - alightStopIds.add(transitLayer.stopString(stopSequence.alightStops.get(i), false)); - rideTimes.add(String.format("%.1f", stopSequence.rideTimesSeconds.get(i) / 60f)); + routeJoiner.add(transitLayer.routeString(routes.get(i), csvOptions.routeRepresentation)); + boardStopJoiner.add(transitLayer.stopString(stopSequence.boardStops.get(i), csvOptions.stopRepresentation)); + alightStopJoiner.add(transitLayer.stopString(stopSequence.alightStops.get(i), csvOptions.stopRepresentation)); + if (csvOptions.feedRepresentation != null) { + feedJoiner.add(transitLayer.feedFromStop(stopSequence.boardStops.get(i))); + } + rideTimeJoiner.add(String.format("%.1f", stopSequence.rideTimesSeconds.get(i) / 60f)); } String accessTime = stopSequence.access == null ? null : String.format("%.1f", stopSequence.access.time / 60f); String egressTime = stopSequence.egress == null ? null : String.format("%.1f", stopSequence.egress.time / 60f); return new String[]{ - routeIds.toString(), - boardStopIds.toString(), - alightStopIds.toString(), - rideTimes.toString(), + routeJoiner.toString(), + boardStopJoiner.toString(), + alightStopJoiner.toString(), + feedJoiner.toString(), + rideTimeJoiner.toString(), accessTime, egressTime }; @@ -55,9 +73,9 @@ public String[] detailsWithGtfsIds(TransitLayer transitLayer){ public Collection transitLegs(TransitLayer transitLayer) { Collection transitLegs = new ArrayList<>(); for (int i = 0; i < routes.size(); i++) { - String routeString = transitLayer.routeString(routes.get(i), true); - String boardStop = transitLayer.stopString(stopSequence.boardStops.get(i), true); - String alightStop = transitLayer.stopString(stopSequence.alightStops.get(i), true); + String routeString = transitLayer.routeString(routes.get(i), NAME_AND_ID); + String boardStop = transitLayer.stopString(stopSequence.boardStops.get(i), NAME_AND_ID); + String alightStop = transitLayer.stopString(stopSequence.alightStops.get(i), NAME_AND_ID); transitLegs.add(new TransitLeg(routeString, stopSequence.rideTimesSeconds.get(i), boardStop, alightStop)); } return transitLegs;