Skip to content

Commit

Permalink
Merge branch 'dev' into lts-plus-locked-schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
ansoncfit committed Dec 2, 2024
2 parents 5c0bec4 + 9ba72c3 commit 428c689
Show file tree
Hide file tree
Showing 37 changed files with 834 additions and 410 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
withSourcesJar()
}

jar {
Expand Down Expand Up @@ -164,7 +165,7 @@ dependencies {
}

// Database driver.
implementation 'org.mongodb:mongo-java-driver:3.11.0'
implementation 'org.mongodb:mongodb-driver-legacy:5.2.0'

// Legacy system for storing Java objects, this functionality is now provided by the MongoDB driver itself.
implementation 'org.mongojack:mongojack:2.10.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,59 +95,62 @@ public interface Config {
boolean testTaskRedelivery ();
}

private Config config;
private final Config config;

// Component Dependencies
private final FileStorage fileStorage;
private final EventBus eventBus;
private final WorkerLauncher workerLauncher;

private final ListMultimap<WorkerCategory, Job> jobs =
MultimapBuilder.hashKeys().arrayListValues().build();
private final ListMultimap<WorkerCategory, Job> jobs = MultimapBuilder.hashKeys().arrayListValues().build();

/**
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* The value should eventually be tuned. The value of 16 is the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*
* NOTE that as a side effect this limits the total throughput of each worker to:
* MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second.
* It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water,
* deserts etc.) In this case the system may need to burn through millions of origins, only checking that they
* aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis.
* It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for
* disconnected origins or at least concisely signal large blocks of them in worker responses.
*/
public final int MAX_TASKS_PER_WORKER = 16;
public static final int MAX_TASKS_PER_WORKER = 40;

/**
* Used when auto-starting spot instances. Set to a smaller value to increase the number of
* workers requested automatically
*/
public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800;
public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000;
public static final int TARGET_TASKS_PER_WORKER_TRANSIT = 800;
public static final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000;

/**
* We want to request spot instances to "boost" regional analyses after a few regional task
* results are received for a given workerCategory. Do so after receiving results for an
* arbitrary task toward the beginning of the job
*/
public final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42;
public static final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42;

/** The maximum number of spot instances allowable in an automatic request */
public final int MAX_WORKERS_PER_CATEGORY = 250;
public static final int MAX_WORKERS_PER_CATEGORY = 250;

/**
* How long to give workers to start up (in ms) before assuming that they have started (and
* starting more on a given graph if they haven't.
*/
public static final long WORKER_STARTUP_TIME = 60 * 60 * 1000;


/** Keeps track of all the workers that have contacted this broker recently asking for work. */
private WorkerCatalog workerCatalog = new WorkerCatalog();

/**
* These objects piece together results received from workers into one regional analysis result
* file per job.
*/
private static Map<String, MultiOriginAssembler> resultAssemblers = new HashMap<>();
/** These objects piece together results received from workers into one regional analysis result file per job. */
private Map<String, MultiOriginAssembler> resultAssemblers = new HashMap<>();

/**
* keep track of which graphs we have launched workers on and how long ago we launched them, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.conveyal.analysis.persistence.AnalysisDB;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.UrlWithHumanName;
import com.conveyal.r5.analyst.progress.Task;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.bson.conversions.Bson;
Expand All @@ -27,6 +28,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;

/**
* Stores vector aggregationAreas (used to define the region of a weighted average accessibility metric).
Expand Down Expand Up @@ -98,10 +100,10 @@ private Collection<AggregationArea> getAggregationAreas (Request req, Response r
}

/** Returns a JSON-wrapped URL for the mask grid of the aggregation area whose id matches the path parameter. */
private ObjectNode getAggregationAreaGridUrl (Request req, Response res) {
private UrlWithHumanName getAggregationAreaGridUrl (Request req, Response res) {
AggregationArea aggregationArea = aggregationAreaCollection.findPermittedByRequestParamId(req);
String url = fileStorage.getURL(aggregationArea.getStorageKey());
return JsonUtil.objectNode().put("url", url);
res.type(APPLICATION_JSON.asString());
return fileStorage.getJsonUrl(aggregationArea.getStorageKey(), aggregationArea.name, "grid");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
import com.conveyal.file.FileUtils;
import com.conveyal.gtfs.GTFSCache;
import com.conveyal.gtfs.GTFSFeed;
import com.conveyal.gtfs.error.GTFSError;
import com.conveyal.gtfs.error.GeneralError;
import com.conveyal.gtfs.model.Stop;
import com.conveyal.gtfs.validator.PostLoadValidator;
import com.conveyal.osmlib.Node;
import com.conveyal.osmlib.OSM;
import com.conveyal.r5.analyst.progress.ProgressInputStream;
import com.conveyal.r5.analyst.cluster.TransportNetworkConfig;
import com.conveyal.r5.analyst.progress.ProgressInputStream;
import com.conveyal.r5.analyst.progress.Task;
import com.conveyal.r5.streets.OSMCache;
import com.conveyal.r5.util.ExceptionUtils;
Expand Down Expand Up @@ -81,6 +80,7 @@ public BundleController (BackendComponents components) {
public void registerEndpoints (Service sparkService) {
sparkService.path("/api/bundle", () -> {
sparkService.get("", this::getBundles, toJson);
sparkService.get("/:_id/config", this::getBundleConfig, toJson);
sparkService.get("/:_id", this::getBundle, toJson);
sparkService.post("", this::create, toJson);
sparkService.put("/:_id", this::update, toJson);
Expand Down Expand Up @@ -110,15 +110,13 @@ private Bundle create (Request req, Response res) {
try {
bundle.name = files.get("bundleName").get(0).getString("UTF-8");
bundle.regionId = files.get("regionId").get(0).getString("UTF-8");

if (files.get("osmId") != null) {
bundle.osmId = files.get("osmId").get(0).getString("UTF-8");
Bundle bundleWithOsm = Persistence.bundles.find(QueryBuilder.start("osmId").is(bundle.osmId).get()).next();
if (bundleWithOsm == null) {
throw AnalysisServerException.badRequest("Selected OSM does not exist.");
}
}

if (files.get("feedGroupId") != null) {
bundle.feedGroupId = files.get("feedGroupId").get(0).getString("UTF-8");
Bundle bundleWithFeed = Persistence.bundles.find(QueryBuilder.start("feedGroupId").is(bundle.feedGroupId).get()).next();
Expand All @@ -135,6 +133,13 @@ private Bundle create (Request req, Response res) {
bundle.feedsComplete = bundleWithFeed.feedsComplete;
bundle.totalFeeds = bundleWithFeed.totalFeeds;
}
if (files.get("config") != null) {
// Validation by deserializing into a model class instance. Unknown fields are ignored to
// allow sending config to custom or experimental workers with features unknown to the backend.
// The fields specifying OSM and GTFS IDs are not expected here. They will be ignored and overwritten.
String configString = files.get("config").get(0).getString();
bundle.config = JsonUtil.objectMapper.readValue(configString, TransportNetworkConfig.class);
}
UserPermissions userPermissions = UserPermissions.from(req);
bundle.accessGroup = userPermissions.accessGroup;
bundle.createdBy = userPermissions.email;
Expand Down Expand Up @@ -274,15 +279,19 @@ private Bundle create (Request req, Response res) {
return bundle;
}

/** SIDE EFFECTS: This method will change the field bundle.config before writing it. */
private void writeNetworkConfigToCache (Bundle bundle) throws IOException {
TransportNetworkConfig networkConfig = new TransportNetworkConfig();
networkConfig.osmId = bundle.osmId;
networkConfig.gtfsIds = bundle.feeds.stream().map(f -> f.bundleScopedFeedId).collect(Collectors.toList());

// If the user specified additional network configuration options, they should already be in bundle.config.
// If no custom options were specified, we start with a fresh, empty instance.
if (bundle.config == null) {
bundle.config = new TransportNetworkConfig();
}
// This will overwrite and override any inconsistent osm and gtfs IDs that were mistakenly supplied by the user.
bundle.config.osmId = bundle.osmId;
bundle.config.gtfsIds = bundle.feeds.stream().map(f -> f.bundleScopedFeedId).collect(Collectors.toList());
String configFileName = bundle._id + ".json";
File configFile = FileUtils.createScratchFile("json");
JsonUtil.objectMapper.writeValue(configFile, networkConfig);

JsonUtil.objectMapper.writeValue(configFile, bundle.config);
FileStorageKey key = new FileStorageKey(BUNDLES, configFileName);
fileStorage.moveIntoStorage(key, configFile);
}
Expand Down Expand Up @@ -312,6 +321,31 @@ private Bundle getBundle (Request req, Response res) {
return bundle;
}

/**
* There are two copies of the Bundle/Network config: one in the Bundle entry in the database and one in a JSON
* file (obtainable by the workers). This method always reads the one in the file, which has been around longer
* and is considered the definitive source of truth. The entry in the database is a newer addition and has only
* been around since September 2024.
*/
private TransportNetworkConfig getBundleConfig(Request request, Response res) {
// Unfortunately this mimics logic in TransportNetworkCache. Deduplicate in a static utility method?
String id = GTFSCache.cleanId(request.params("_id"));
FileStorageKey key = new FileStorageKey(BUNDLES, id, "json");
File networkConfigFile = fileStorage.getFile(key);
if (!networkConfigFile.exists()) {
throw AnalysisServerException.notFound("Bundle configuration file could not be found.");
}

// Unlike in the worker, we expect the backend to have a model field for every known network/bundle option.
// Therefore, use the default objectMapper that does not tolerate unknown fields.
try {
return JsonUtil.objectMapper.readValue(networkConfigFile, TransportNetworkConfig.class);
} catch (Exception exception) {
LOG.error("Exception deserializing stored network config", exception);
return null;
}
}

private Collection<Bundle> getBundles (Request req, Response res) {
return Persistence.bundles.findPermittedForQuery(req);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ private Object getFile (Request req, Response res) throws Exception {
FileStorageKey key = new FileStorageKey(category, filename);
File file = fileStorage.getFile(key);
FileStorageFormat format = FileStorageFormat.fromFilename(filename);
res.type(format.mimeType);
if (format != null) {
res.type(format.mimeType);
}

// If the content-encoding is set to gzip, Spark automatically gzips the response. This double-gzips anything
// that was already gzipped. Some of our files are already gzipped, and we rely on the the client browser to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.conveyal.file.FileStorageFormat;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.file.UrlWithHumanName;
import com.conveyal.r5.analyst.FreeFormPointSet;
import com.conveyal.r5.analyst.Grid;
import com.conveyal.r5.analyst.PointSet;
Expand Down Expand Up @@ -61,6 +62,7 @@
import static com.conveyal.file.FileCategory.GRIDS;
import static com.conveyal.r5.analyst.WebMercatorExtents.parseZoom;
import static com.conveyal.r5.analyst.progress.WorkProductType.OPPORTUNITY_DATASET;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;

/**
* Controller that handles fetching opportunity datasets (grids and other pointset formats).
Expand Down Expand Up @@ -94,10 +96,6 @@ public OpportunityDatasetController (
/** Store upload status objects FIXME trivial Javadoc */
private final List<OpportunityDatasetUploadStatus> uploadStatuses = new ArrayList<>();

private ObjectNode getJsonUrl (FileStorageKey key) {
return JsonUtil.objectNode().put("url", fileStorage.getURL(key));
}

private void addStatusAndRemoveOldStatuses(OpportunityDatasetUploadStatus status) {
uploadStatuses.add(status);
LocalDateTime now = LocalDateTime.now();
Expand All @@ -113,10 +111,11 @@ private Collection<OpportunityDataset> getRegionDatasets(Request req, Response r
);
}

private Object getOpportunityDataset(Request req, Response res) {
private UrlWithHumanName getOpportunityDataset(Request req, Response res) {
OpportunityDataset dataset = Persistence.opportunityDatasets.findByIdFromRequestIfPermitted(req);
if (dataset.format == FileStorageFormat.GRID) {
return getJsonUrl(dataset.getStorageKey());
res.type(APPLICATION_JSON.asString());
return fileStorage.getJsonUrl(dataset.getStorageKey(), dataset.sourceName + "_" + dataset.name, "grid");
} else {
// Currently the UI can only visualize grids, not other kinds of datasets (freeform points).
// We do generate a rasterized grid for each of the freeform pointsets we create, so ideally we'd redirect
Expand Down Expand Up @@ -564,9 +563,10 @@ private List<Grid> createGridsFromShapefile(List<FileItem> fileItems,
* Respond to a request with a redirect to a downloadable file.
* @param req should specify regionId, opportunityDatasetId, and an available download format (.tiff or .grid)
*/
private Object downloadOpportunityDataset (Request req, Response res) throws IOException {
private UrlWithHumanName downloadOpportunityDataset (Request req, Response res) throws IOException {
FileStorageFormat downloadFormat;
String format = req.params("format");
res.type(APPLICATION_JSON.asString());
try {
downloadFormat = FileStorageFormat.valueOf(format.toUpperCase());
} catch (IllegalArgumentException iae) {
Expand All @@ -576,38 +576,32 @@ private Object downloadOpportunityDataset (Request req, Response res) throws IOE
String regionId = req.params("_id");
String gridKey = format;
FileStorageKey storageKey = new FileStorageKey(GRIDS, String.format("%s/%s.grid", regionId, gridKey));
return getJsonUrl(storageKey);
return fileStorage.getJsonUrl(storageKey, gridKey, "grid");
}
if (FileStorageFormat.GRID.equals(downloadFormat)) {
return getOpportunityDataset(req, res);
}

if (FileStorageFormat.GRID.equals(downloadFormat)) return getOpportunityDataset(req, res);

final OpportunityDataset opportunityDataset = Persistence.opportunityDatasets.findByIdFromRequestIfPermitted(req);

FileStorageKey gridKey = opportunityDataset.getStorageKey(FileStorageFormat.GRID);
FileStorageKey formatKey = opportunityDataset.getStorageKey(downloadFormat);

// if this grid is not on S3 in the requested format, try to get the .grid format
if (!fileStorage.exists(gridKey)) {
throw AnalysisServerException.notFound("Requested grid does not exist.");
}

if (!fileStorage.exists(formatKey)) {
// get the grid and convert it to the requested format
File gridFile = fileStorage.getFile(gridKey);
Grid grid = Grid.read(new GZIPInputStream(new FileInputStream(gridFile))); // closes input stream
File localFile = FileUtils.createScratchFile(downloadFormat.toString());
FileOutputStream fos = new FileOutputStream(localFile);

if (FileStorageFormat.PNG.equals(downloadFormat)) {
grid.writePng(fos);
} else if (FileStorageFormat.GEOTIFF.equals(downloadFormat)) {
grid.writeGeotiff(fos);
}

fileStorage.moveIntoStorage(formatKey, localFile);
}

return getJsonUrl(formatKey);
return fileStorage.getJsonUrl(formatKey, opportunityDataset.sourceName + "_" + opportunityDataset.name, downloadFormat.extension);
}

/**
Expand Down
Loading

0 comments on commit 428c689

Please sign in to comment.