diff --git a/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java b/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java index 4de8e3098..ec59b15b8 100644 --- a/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java +++ b/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java @@ -34,7 +34,7 @@ public LocalBackendComponents () { authentication = new LocalAuthentication(); // TODO add nested LocalWorkerComponents here, to reuse some components, and pass it into the LocalWorkerLauncher? workerLauncher = new LocalWorkerLauncher(config, fileStorage, gtfsCache, osmCache); - broker = new Broker(config, fileStorage, eventBus, workerLauncher); + broker = new Broker(config, eventBus, workerLauncher); censusExtractor = new SeamlessCensusGridExtractor(config); // Instantiate the HttpControllers last, when all the components except the HttpApi are already created. List httpControllers = standardHttpControllers(); diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index d8895e6e3..45f0b0535 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -6,17 +6,11 @@ import com.conveyal.analysis.components.eventbus.EventBus; import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent; import com.conveyal.analysis.components.eventbus.WorkerEvent; -import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.results.MultiOriginAssembler; -import com.conveyal.analysis.util.JsonUtil; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.conveyal.r5.analyst.cluster.WorkerStatus; -import com.conveyal.r5.analyst.scenario.Scenario; import com.conveyal.r5.util.ExceptionUtils; import com.google.common.collect.ListMultimap; import com.google.common.collect.MultimapBuilder; @@ -27,8 +21,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,7 +34,6 @@ import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT; -import static com.conveyal.file.FileCategory.BUNDLES; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -93,7 +84,6 @@ public interface Config { private Config config; // Component Dependencies - private final FileStorage fileStorage; private final EventBus eventBus; private final WorkerLauncher workerLauncher; @@ -143,9 +133,8 @@ public interface Config { public TObjectLongMap recentlyRequestedWorkers = TCollections.synchronizedMap(new TObjectLongHashMap<>()); - public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { + public Broker (Config config, EventBus eventBus, WorkerLauncher workerLauncher) { this.config = config; - this.fileStorage = fileStorage; this.eventBus = eventBus; this.workerLauncher = workerLauncher; } @@ -154,26 +143,16 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker * Enqueue a set of tasks for a regional analysis. * Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job. */ - public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) { - - // Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID. - RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis); - - LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId); - if (findJob(templateTask.jobId) != null) { - LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId); - throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId); + public synchronized void enqueueTasksForRegionalJob (Job job, MultiOriginAssembler assembler) { + LOG.info("Enqueuing tasks for job {} using template task.", job.jobId); + if (findJob(job.jobId) != null) { + LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId); + throw new RuntimeException("Enqueued duplicate job " + job.jobId); } - WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis); - Job job = new Job(templateTask, workerTags); jobs.put(job.workerCategory, job); // Register the regional job so results received from multiple workers can be assembled into one file. - // TODO encapsulate MultiOriginAssemblers in a new Component - // Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler. - // That is not catastrophic, but the user may need to recognize and delete the stalled regional job. - MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage); - resultAssemblers.put(templateTask.jobId, assembler); + resultAssemblers.put(job.jobId, assembler); if (config.testTaskRedelivery()) { // This is a fake job for testing, don't confuse the worker startup code below with null graph ID. @@ -181,56 +160,12 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn } if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) { - createOnDemandWorkerInCategory(job.workerCategory, workerTags); + createOnDemandWorkerInCategory(job.workerCategory, job.workerTags); } else { // Workers exist in this category, clear out any record that we're waiting for one to start up. recentlyRequestedWorkers.remove(job.workerCategory); } - eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group)); - } - - /** - * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin - * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to - * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID - * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, - * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on - * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't - * want to affect the original object which contains all the scenario details. - * TODO Why is all this detail added after the Persistence call? - * We don't want to store all the details added below in Mongo? - */ - private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) { - RegionalTask templateTask = regionalAnalysis.request.clone(); - // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. - Scenario scenario = templateTask.scenario; - templateTask.scenarioId = scenario.id; - // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. - templateTask.scenario = null; - String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id); - FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); - try { - File localScenario = FileUtils.createScratchFile("json"); - JsonUtil.objectMapper.writeValue(localScenario, scenario); - // FIXME this is using a network service in a method called from a synchronized broker method. - // Move file into storage before entering the synchronized block. - fileStorage.moveIntoStorage(fileStorageKey, localScenario); - } catch (IOException e) { - LOG.error("Error storing scenario for retrieval by workers.", e); - } - // Fill in all the fields in the template task that will remain the same across all tasks in a job. - // I am not sure why we are re-setting all these fields, it seems like they are already set when the task is - // initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before - // eliminating or moving these lines. - templateTask.jobId = regionalAnalysis._id; - templateTask.graphId = regionalAnalysis.bundleId; - templateTask.workerVersion = regionalAnalysis.workerVersion; - templateTask.height = regionalAnalysis.height; - templateTask.width = regionalAnalysis.width; - templateTask.north = regionalAnalysis.north; - templateTask.west = regionalAnalysis.west; - templateTask.zoom = regionalAnalysis.zoom; - return templateTask; + eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group)); } /** diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index fa2c2ca66..b3be77c70 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -13,7 +13,6 @@ import java.util.Set; import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkNotNull; /** * A Job is a collection of tasks that represent all the origins in a regional analysis. All the @@ -61,13 +60,13 @@ public class Job { * The number of remaining tasks can be derived from the deliveredTasks BitSet, but as an * optimization we keep a separate counter to avoid constantly scanning over that whole bitset. */ - protected int nTasksCompleted; + protected int nTasksCompleted = 0; /** * The total number of task deliveries that have occurred. A task may be counted more than * once if it is redelivered. */ - protected int nTasksDelivered; + protected int nTasksDelivered = 0; /** Every task in this job will be based on this template task, but have its origin coordinates changed. */ public final RegionalTask templateTask; @@ -128,23 +127,31 @@ private RegionalTask makeOneTask (int taskNumber) { */ public final Set errors = new HashSet(); - public Job (RegionalTask templateTask, WorkerTags workerTags) { - this.jobId = templateTask.jobId; - this.templateTask = templateTask; - this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion); - this.nTasksCompleted = 0; - this.nextTaskToDeliver = 0; - - if (templateTask.originPointSetKey != null) { - checkNotNull(templateTask.originPointSet); - this.nTasksTotal = templateTask.originPointSet.featureCount(); - } else { - this.nTasksTotal = templateTask.width * templateTask.height; - } - - this.completedTasks = new BitSet(nTasksTotal); + public Job (RegionalTask task, WorkerTags workerTags) { + templateTask = templateTaskFromRegionalTask(task); + jobId = task.jobId; + workerCategory = new WorkerCategory(task.graphId, task.workerVersion); + nTasksTotal = task.getTasksTotal(); + completedTasks = new BitSet(nTasksTotal); this.workerTags = workerTags; + } + /** + * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin + * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to + * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID + * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, + * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on + * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't + * want to affect the original object which contains all the scenario details. + */ + private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) { + RegionalTask templateTask = task.clone(); + // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. + templateTask.scenarioId = templateTask.scenario.id; + // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. + templateTask.scenario = null; + return templateTask; } public boolean markTaskCompleted(int taskId) { diff --git a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java index 3e6b53326..ddbd4760a 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java +++ b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java @@ -3,11 +3,13 @@ import com.conveyal.analysis.components.BackendComponents; import com.conveyal.analysis.components.LocalBackendComponents; import com.conveyal.analysis.models.RegionalAnalysis; +import com.conveyal.analysis.results.MultiOriginAssembler; import com.conveyal.r5.analyst.cluster.RegionalTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.UUID; /** @@ -66,7 +68,9 @@ private static void sendFakeJob(Broker broker) { templateTask.scenarioId = "FAKE"; RegionalAnalysis regionalAnalysis = new RegionalAnalysis(); regionalAnalysis.request = templateTask; - broker.enqueueTasksForRegionalJob(regionalAnalysis); + var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + var assembler = new MultiOriginAssembler(job, new ArrayList<>()); + broker.enqueueTasksForRegionalJob(job, assembler); } public static String compactUUID() { diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index 32336fd7c..fde0cc225 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -4,12 +4,20 @@ import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; import com.conveyal.analysis.components.broker.Broker; +import com.conveyal.analysis.components.broker.Job; import com.conveyal.analysis.components.broker.JobStatus; +import com.conveyal.analysis.components.broker.WorkerTags; import com.conveyal.analysis.models.AnalysisRequest; import com.conveyal.analysis.models.OpportunityDataset; import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; +import com.conveyal.analysis.results.AccessCsvResultWriter; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.results.GridResultWriter; +import com.conveyal.analysis.results.MultiOriginAssembler; +import com.conveyal.analysis.results.PathCsvResultWriter; +import com.conveyal.analysis.results.RegionalResultWriter; +import com.conveyal.analysis.results.TimeCsvResultWriter; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; @@ -20,6 +28,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.PointSetCache; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.scenario.Scenario; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.primitives.Ints; import com.mongodb.QueryBuilder; @@ -44,6 +53,7 @@ import static com.conveyal.analysis.util.JsonUtil.toJson; import static com.conveyal.file.FileCategory.BUNDLES; import static com.conveyal.file.FileCategory.RESULTS; +import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.conveyal.r5.transit.TransportNetworkCache.getScenarioFilename; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -506,17 +516,64 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro // This assigns it creation/update time stamps and an ID, which is needed to name any output CSV files. regionalAnalysis = Persistence.regionalAnalyses.create(regionalAnalysis); + // Create the regional job + var regionalJob = new Job(task, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + + // Create the result writers. Store their result file paths in the database. + var resultWriters = new ArrayList(); + if (!task.makeTauiSite) { + if (task.recordAccessibility) { + if (task.originPointSet != null) { + var accessWriter = new AccessCsvResultWriter(task, fileStorage); + resultWriters.add(accessWriter); + regionalAnalysis.resultStorage.put(accessWriter.resultType(), accessWriter.getFileName()); + } else { + resultWriters.addAll(GridResultWriter.createWritersFromTask(regionalAnalysis, task, fileStorage)); + } + } + + if (task.recordTimes) { + var timesWriter = new TimeCsvResultWriter(task, fileStorage); + resultWriters.add(timesWriter); + regionalAnalysis.resultStorage.put(timesWriter.resultType(), timesWriter.getFileName()); + } + + if (task.includePathResults) { + var pathsWriter = new PathCsvResultWriter(task, fileStorage); + resultWriters.add(pathsWriter); + regionalAnalysis.resultStorage.put(pathsWriter.resultType(), pathsWriter.getFileName()); + } + checkArgument(notNullOrEmpty(resultWriters), "A regional analysis should always create at least one grid or CSV file."); + } + + // Create the multi-origin assembler with the writers. + var assembler = new MultiOriginAssembler(regionalJob, resultWriters); + + // Stored scenario is needed by workers. Must be done ahead of enqueueing the job. + storeScenarioJson(task.graphId, task.scenario); + // Register the regional job with the broker, which will distribute individual tasks to workers and track progress. - broker.enqueueTasksForRegionalJob(regionalAnalysis); + broker.enqueueTasksForRegionalJob(regionalJob, assembler); // Flush to the database any information added to the RegionalAnalysis object when it was enqueued. - // This includes the paths of any CSV files that will be produced by this analysis. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock() or put(). + // This includes the paths of any CSV files that will be produced by this analysis. The regional analysis was + // created in this method and therefore we can bypass the nonce / permission checking. Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis); return regionalAnalysis; } + /** + * Store the regional analysis scenario as JSON for retrieval by the workers. + */ + private void storeScenarioJson(String graphId, Scenario scenario) throws IOException { + String fileName = getScenarioFilename(graphId, scenario.id); + FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); + File localScenario = FileUtils.createScratchFile("json"); + JsonUtil.objectMapper.writeValue(localScenario, scenario); + fileStorage.moveIntoStorage(fileStorageKey, localScenario); + } + private RegionalAnalysis updateRegionalAnalysis (Request request, Response response) throws IOException { RegionalAnalysis regionalAnalysis = JsonUtil.objectMapper.readValue(request.body(), RegionalAnalysis.class); return Persistence.regionalAnalyses.updateByUserIfPermitted(regionalAnalysis, UserPermissions.from(request)); diff --git a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java index e208ac827..118350a44 100644 --- a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java @@ -4,13 +4,12 @@ import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; public class AccessCsvResultWriter extends CsvResultWriter { - public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } @@ -49,13 +48,14 @@ public Iterable rowValues (RegionalWorkResult workResult) { for (int p = 0; p < task.percentiles.length; p++) { int[] cutoffsForPercentile = percentilesForDestPointset[p]; for (int c = 0; c < task.cutoffsMinutes.length; c++) { + int accessibilityValue = cutoffsForPercentile[c]; // Ideally we'd output the pointset IDs (rather than keys) which we have in the RegionalAnalysis rows.add(new String[] { originId, task.destinationPointSetKeys[d], Integer.toString(task.percentiles[p]), Integer.toString(task.cutoffsMinutes[c]), - Integer.toString(workResult.accessibilityValues[d][p][c]) + Integer.toString(accessibilityValue) }); } } diff --git a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java deleted file mode 100644 index df289c9fe..000000000 --- a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.file.FileCategory; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; -import com.google.common.io.ByteStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; - -import static com.conveyal.file.FileCategory.RESULTS; -import static com.conveyal.r5.common.Util.human; - -/** - * This is an abstract base class for writing regional analysis results into a file for long term - * storage. It provides reuseable logic for creating local buffer files and uploading them to long - * term cloud storage once the regional analysis is complete. Concrete subclasses handle writing CSV - * or proprietary binary grid files, depending on the type of regional analysis. - */ -public abstract class BaseResultWriter { - - private static final Logger LOG = LoggerFactory.getLogger(BaseResultWriter.class); - - private final FileStorage fileStorage; - - protected File bufferFile; - - public BaseResultWriter (FileStorage fileStorage) { - this.fileStorage = fileStorage; - } - - // Can this be merged into the constructor? - protected void prepare (String jobId) { - try { - bufferFile = File.createTempFile(jobId + "_", ".results"); - // On unexpected server shutdown, these files should be deleted. - // We could attempt to recover from shutdowns but that will take a lot of changes and persisted data. - bufferFile.deleteOnExit(); - } catch (IOException e) { - LOG.error("Exception while creating buffer file for multi-origin assembler: " + e.toString()); - } - } - - /** - * Gzip the access grid and store it. - */ - protected synchronized void finish (String fileName) throws IOException { - LOG.info("Compressing {} and moving into file storage.", fileName); - FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, fileName); - File gzippedResultFile = FileUtils.createScratchFile(); - - // There's probably a more elegant way to do this with NIO and without closing the buffer. - // That would be Files.copy(File.toPath(),X) or ByteStreams.copy. - InputStream is = new BufferedInputStream(new FileInputStream(bufferFile)); - OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(gzippedResultFile))); - ByteStreams.copy(is, os); - is.close(); - os.close(); - - LOG.info("GZIP compression reduced analysis results {} from {} to {} ({}x compression)", - fileName, - human(bufferFile.length(), "B"), - human(gzippedResultFile.length(), "B"), - (double) bufferFile.length() / gzippedResultFile.length() - ); - - fileStorage.moveIntoStorage(fileStorageKey, gzippedResultFile); - bufferFile.delete(); - } - - /** - * Close all buffers and temporary files. - */ - abstract void terminate () throws Exception; - -} diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index e07abc2af..501ac5306 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -1,6 +1,9 @@ package com.conveyal.analysis.results; +import com.conveyal.file.FileCategory; import com.conveyal.file.FileStorage; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.csvreader.CsvWriter; @@ -9,6 +12,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; +import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -19,12 +23,12 @@ * Subclasses are used to record origin/destination "skim" matrices, accessibility indicators for non-gridded * ("freeform") origin point sets, and cataloging paths between pairs of origins and destinations. */ -public abstract class CsvResultWriter extends BaseResultWriter implements RegionalResultWriter { +public abstract class CsvResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(CsvResultWriter.class); - - public final String fileName; + private final File bufferFile = FileUtils.createScratchFile("csv"); private final CsvWriter csvWriter; + private final FileStorage fileStorage; private int nDataColumns; /** @@ -51,19 +55,31 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region /** * Construct a writer to record incoming results in a CSV file, with header row consisting of * "origin", "destination", and the supplied indicator. - * FIXME it's strange we're manually passing injectable components into objects not wired up at application construction. */ - CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(fileStorage); - super.prepare(task.jobId); - this.fileName = task.jobId + "_" + resultType() +".csv"; - BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); - csvWriter = new CsvWriter(bufferedWriter, ','); - setDataColumns(columnHeaders()); + CsvResultWriter (RegionalTask task, FileStorage fileStorage) { + this.fileStorage = fileStorage; + String[] columns = columnHeaders(); + csvWriter = getBufferedCsvWriter(columns); + this.nDataColumns = columns.length; this.task = task; LOG.info("Created CSV file to hold {} results for regional job {}", resultType(), task.jobId); } + public String getFileName() { + return task.jobId + "_" + resultType() + ".csv"; + } + + private CsvWriter getBufferedCsvWriter(String[] columnHeaders) { + try { + BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); + var writer = new CsvWriter(bufferedWriter, ','); + writer.writeRecord(columnHeaders); + return writer; + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } + } + /** * Writes a header row containing the supplied data columns. */ @@ -74,14 +90,16 @@ protected void setDataColumns(String... columns) throws IOException { /** * Gzip the csv file and move it into permanent file storage such as AWS S3. - * Note: stored file will undergo gzip compression in super.finish(), but be stored with a .csv extension. + * Note: stored file will undergo gzip compression but be stored with a .csv extension. * When this file is downloaded from the UI, the browser will decompress, yielding a logically named .csv file. * Downloads through another channel (e.g. aws s3 cp), will need to be decompressed manually. */ @Override public synchronized void finish () throws IOException { csvWriter.close(); - super.finish(this.fileName); + var gzippedFile = FileUtils.gzipFile(bufferFile); + fileStorage.moveIntoStorage(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile); + bufferFile.delete(); } /** diff --git a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java index 88b1a8c08..72719c8e3 100644 --- a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java @@ -1,16 +1,24 @@ package com.conveyal.analysis.results; +import com.conveyal.analysis.models.RegionalAnalysis; +import com.conveyal.file.FileCategory; import com.conveyal.file.FileStorage; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.LittleEndianIntOutputStream; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; import static com.conveyal.r5.common.Util.human; @@ -37,11 +45,13 @@ *
  • (repeated 4-byte int) values of each pixel in row-major order: axis order (row, column, channel).
  • * */ -public class GridResultWriter extends BaseResultWriter { +public class GridResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(GridResultWriter.class); - private RandomAccessFile randomAccessFile; + private final File bufferFile = FileUtils.createScratchFile("grid"); + private final FileStorage fileStorage; + private final RandomAccessFile randomAccessFile; /** The version of the access grids we produce */ private static final int ACCESS_GRID_VERSION = 0; @@ -59,13 +69,44 @@ public class GridResultWriter extends BaseResultWriter { */ private final int channels; + private final int percentileIndex; + private final int destinationIndex; + private final String gridFileName; + /** - * Construct an writer for a single regional analysis result grid, using the proprietary + * We create one GridResultWriter for each destination pointset and percentile. + * Each of those output files contains data for all specified travel time cutoffs at each origin. + */ + public static List createWritersFromTask(RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage) { + int nPercentiles = task.percentiles.length; + int nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; + // Create one grid writer per percentile and destination pointset. + var gridWriters = new ArrayList(); + for (int destinationIndex = 0; destinationIndex < nDestinationPointSets; destinationIndex++) { + for (int percentileIndex = 0; percentileIndex < nPercentiles; percentileIndex++) { + String destinationPointSetId = regionalAnalysis.destinationPointSetIds[destinationIndex]; + gridWriters.add(new GridResultWriter( + task, + fileStorage, + percentileIndex, + destinationIndex, + destinationPointSetId + )); + } + } + return gridWriters; + } + + /** + * Construct a writer for a single regional analysis result grid, using the proprietary * Conveyal grid format. This also creates the on-disk scratch buffer into which the results * from the workers will be accumulated. */ - GridResultWriter (RegionalTask task, FileStorage fileStorage) { - super(fileStorage); + GridResultWriter (RegionalTask task, FileStorage fileStorage, int percentileIndex, int destinationIndex, String destinationPointSetId) { + this.fileStorage = fileStorage; + this.gridFileName = String.format("%s_%s_P%d.access", task.jobId, destinationPointSetId, task.percentiles[percentileIndex]); + this.percentileIndex = percentileIndex; + this.destinationIndex = destinationIndex; int width = task.width; int height = task.height; this.channels = task.cutoffsMinutes.length; @@ -75,7 +116,6 @@ public class GridResultWriter extends BaseResultWriter { height, channels ); - super.prepare(task.jobId); try { // Write the access grid file header to the temporary file. @@ -113,9 +153,11 @@ public class GridResultWriter extends BaseResultWriter { /** Gzip the access grid and upload it to file storage (such as AWS S3). */ @Override - protected synchronized void finish (String fileName) throws IOException { - super.finish(fileName); + public synchronized void finish () throws IOException { randomAccessFile.close(); + var gzippedFile = FileUtils.gzipFile(bufferFile); + fileStorage.moveIntoStorage(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile); + bufferFile.delete(); } /** @@ -130,18 +172,25 @@ private static byte[] intToLittleEndianByteArray (int i) { return byteBuffer.array(); } + @Override + public void writeOneWorkResult(RegionalWorkResult workResult) throws Exception { + // Drop work results for this particular origin into a little-endian output file. + int[][] percentilesForGrid = workResult.accessibilityValues[destinationIndex]; + int[] cutoffsForPercentile = percentilesForGrid[percentileIndex]; + writeOneOrigin(workResult.taskId, cutoffsForPercentile); + } + /** * Write all channels at once to the proper subregion of the buffer for this origin. The origins we receive have 2d * coordinates. Flatten them to compute file offsets and for the origin checklist. */ - synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOException { + private void writeOneOrigin (int taskNumber, int[] values) throws IOException { if (values.length != channels) { throw new IllegalArgumentException("Number of channels to be written does not match this writer."); } long offset = HEADER_LENGTH_BYTES + (taskNumber * channels * Integer.BYTES); // RandomAccessFile is not threadsafe and multiple threads may call this, so synchronize. - // TODO why is the method also synchronized then? - synchronized (this) { + synchronized (randomAccessFile) { randomAccessFile.seek(offset); // FIXME should this be delta-coded? The Selecting grid reducer seems to expect it to be. int lastValue = 0; @@ -154,7 +203,7 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti } @Override - synchronized void terminate () throws IOException { + public synchronized void terminate () throws IOException { randomAccessFile.close(); bufferFile.delete(); } diff --git a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java b/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java deleted file mode 100644 index 5f4d90f8a..000000000 --- a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.file.FileStorage; -import com.conveyal.r5.analyst.cluster.RegionalTask; -import com.conveyal.r5.analyst.cluster.RegionalWorkResult; - -/** - * Adapts our collection of grid writers (one for each destination pointset and percentile) to give them the - * same interface as our CSV writers, so CSV and Grids can be processed similarly in MultiOriginAssembler. - */ -public class MultiGridResultWriter implements RegionalResultWriter { - - private final RegionalAnalysis regionalAnalysis; - - private final RegionalTask task; - - /** - * We create one GridResultWriter for each destination pointset and percentile. - * Each of those output files contains data for all travel time cutoffs at each origin. - */ - private final GridResultWriter[][] accessibilityGridWriters; - - /** The number of different percentiles for which we're calculating accessibility on the workers. */ - private final int nPercentiles; - - /** The number of destination pointsets to which we're calculating accessibility */ - private final int nDestinationPointSets; - - /** Constructor */ - public MultiGridResultWriter ( - RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage - ) { - // We are storing the regional analysis just to get its pointset IDs (not keys) and its own ID. - this.regionalAnalysis = regionalAnalysis; - this.task = task; - this.nPercentiles = task.percentiles.length; - this.nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; - // Create one grid writer per percentile and destination pointset. - accessibilityGridWriters = new GridResultWriter[nDestinationPointSets][nPercentiles]; - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - accessibilityGridWriters[d][p] = new GridResultWriter(task, fileStorage); - } - } - } - - @Override - public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception { - // Drop work results for this particular origin into a little-endian output file. - // TODO more efficient way to write little-endian integers - // TODO check monotonic increasing invariants here rather than in worker. - // Infer x and y cell indexes based on the template task - int taskNumber = workResult.taskId; - for (int d = 0; d < workResult.accessibilityValues.length; d++) { - int[][] percentilesForGrid = workResult.accessibilityValues[d]; - for (int p = 0; p < nPercentiles; p++) { - int[] cutoffsForPercentile = percentilesForGrid[p]; - GridResultWriter gridWriter = accessibilityGridWriters[d][p]; - gridWriter.writeOneOrigin(taskNumber, cutoffsForPercentile); - } - } - } - - @Override - public void terminate () throws Exception { - for (GridResultWriter[] writers : accessibilityGridWriters) { - for (GridResultWriter writer : writers) { - writer.terminate(); - } - } - } - - @Override - public void finish () throws Exception { - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - int percentile = task.percentiles[p]; - String destinationPointSetId = regionalAnalysis.destinationPointSetIds[d]; - // TODO verify that regionalAnalysis._id is the same as job.jobId - String gridFileName = - String.format("%s_%s_P%d.access", regionalAnalysis._id, destinationPointSetId, percentile); - accessibilityGridWriters[d][p].finish(gridFileName); - } - } - } - -} diff --git a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java index 3bbd5915f..bdf2b07b2 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java +++ b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java @@ -2,23 +2,15 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.components.broker.Job; -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.analysis.persistence.Persistence; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageFormat; import com.conveyal.r5.analyst.PointSet; +import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import com.conveyal.r5.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkArgument; - /** * This assembles regional results arriving from workers into one or more files per regional analysis on * the backend. This is not a singleton component: one MultiOriginAssembler instance is created per currently active @@ -32,13 +24,6 @@ public class MultiOriginAssembler { private static final int MAX_FREEFORM_DESTINATIONS = 4_000_000; - /** - * The regional analysis for which this object is assembling results. - * We retain the whole object rather than just its ID so we'll have the full details, e.g. destination point set - * IDs and scenario, things that are stripped out of the template task sent to the workers. - */ - private final RegionalAnalysis regionalAnalysis; - /** * The object representing the progress of the regional analysis as tracked by the broker. * It may appear job.templateTask has all the information needed, making the regionalAnalysis field @@ -48,7 +33,7 @@ public class MultiOriginAssembler { public final Job job; // One writer per CSV/Grids we're outputting - private List resultWriters = new ArrayList<>(); + private final List resultWriters; /** * The number of distinct origin points for which we've received at least one result. If for @@ -68,95 +53,30 @@ public class MultiOriginAssembler { */ private final BitSet originsReceived; - /** - * Total number of origin points for which we're expecting results. Note that the total - * number of results received could be higher in the event of an overzealous task redelivery. - */ - public final int nOriginsTotal; - /** * Constructor. This sets up one or more ResultWriters depending on whether we're writing gridded or non-gridded * cumulative opportunities accessibility, or origin-destination travel times. - * TODO do not pass the FileStorage component down into this non-component and the ResultWriter non-component, - * clarify design concepts on this point (e.g. only components should know other components exist). - * Rather than pushing the component all the way down to the leaf function call, we return the finished - * file up to an umbrella location where a single reference to the file storage can be used to - * store all of them. */ - public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileStorage fileStorage) { - try { - this.regionalAnalysis = regionalAnalysis; - this.job = job; - this.nOriginsTotal = job.nTasksTotal; - this.originsReceived = new BitSet(job.nTasksTotal); - // Check that origin and destination sets are not too big for generating CSV files. - if (!job.templateTask.makeTauiSite && - job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension) - ) { - // This requires us to have already loaded this destination pointset instance into the transient field. - PointSet destinationPointSet = job.templateTask.destinationPointSets[0]; - if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) { - if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS || - destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS - ) { - throw new AnalysisServerException(String.format( - "Freeform requests limited to %d destinations and %d origin-destination pairs.", - MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS - )); - } - } - } - - if (job.templateTask.recordAccessibility) { - if (job.templateTask.originPointSet != null) { - resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage)); - } else { - resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage)); - } - } - - if (job.templateTask.recordTimes) { - resultWriters.add(new TimeCsvResultWriter(job.templateTask, fileStorage)); - } - - if (job.templateTask.includePathResults) { - resultWriters.add(new PathCsvResultWriter(job.templateTask, fileStorage)); - } - - checkArgument(job.templateTask.makeTauiSite || notNullOrEmpty(resultWriters), - "A non-Taui regional analysis should always create at least one grid or CSV file."); - - // Record the paths of any CSV files that will be produced by this analysis. - // The caller must flush the RegionalAnalysis back out to the database to retain this information. - // We avoid database access here in constructors, especially when called in synchronized methods. - for (RegionalResultWriter writer : resultWriters) { - // FIXME instanceof+cast is ugly, do this some other way or even record the Grids - if (writer instanceof CsvResultWriter) { - CsvResultWriter csvWriter = (CsvResultWriter) writer; - regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName); - } - } - } catch (Exception e) { - throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e)); - } + public MultiOriginAssembler (Job job, List resultWriters) { + this.job = job; + this.resultWriters = resultWriters; + this.originsReceived = new BitSet(job.nTasksTotal); } /** - * Gzip the output files and persist them to cloud storage. + * Check that origin and destination sets are not too big for generating CSV files. */ - private synchronized void finish() { - LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); - try { - for (RegionalResultWriter writer : resultWriters) { - writer.finish(); + public static void ensureOdPairsUnderLimit(RegionalTask task, PointSet destinationPointSet) { + // This requires us to have already loaded this destination pointset instance into the transient field. + if ((task.recordTimes || task.includePathResults) && !task.oneToOne) { + if (task.getTasksTotal() * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS || + destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS + ) { + throw new AnalysisServerException(String.format( + "Freeform requests limited to %d destinations and %d origin-destination pairs.", + MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS + )); } - regionalAnalysis.complete = true; - // Write updated regionalAnalysis object back out to database, to mark it complete and record locations - // of any CSV files generated. Use method that updates lock/timestamp, otherwise updates are not seen in UI. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock(). - Persistence.regionalAnalyses.put(regionalAnalysis); - } catch (Exception e) { - LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); } } @@ -178,8 +98,17 @@ public synchronized void handleMessage (RegionalWorkResult workResult) throws Ex originsReceived.set(workResult.taskId); nComplete += 1; } - if (nComplete == nOriginsTotal) { - finish(); + + // If finished, run finish on all the result writers. + if (nComplete == job.nTasksTotal) { + LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); + try { + for (RegionalResultWriter writer : resultWriters) { + writer.finish(); + } + } catch (Exception e) { + LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); + } } } diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 0dadb4337..3022befe3 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -6,7 +6,6 @@ import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.apache.commons.lang3.ArrayUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -14,7 +13,7 @@ public class PathCsvResultWriter extends CsvResultWriter { - public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } diff --git a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java index 8380a5bea..a765d951e 100644 --- a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java @@ -2,12 +2,10 @@ import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; - /** * Common interface for classes that write regional results out to CSV or Grids on the backend. */ -interface RegionalResultWriter { +public interface RegionalResultWriter { void writeOneWorkResult (RegionalWorkResult workResult) throws Exception; diff --git a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java index 144da7713..90d4f8f5e 100644 --- a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java @@ -5,16 +5,14 @@ import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; public class TimeCsvResultWriter extends CsvResultWriter { - public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } diff --git a/src/main/java/com/conveyal/file/FileUtils.java b/src/main/java/com/conveyal/file/FileUtils.java index f2eb94842..8b24570f5 100644 --- a/src/main/java/com/conveyal/file/FileUtils.java +++ b/src/main/java/com/conveyal/file/FileUtils.java @@ -12,6 +12,7 @@ import java.io.RandomAccessFile; import java.nio.file.Files; import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public abstract class FileUtils { /** @@ -86,6 +87,20 @@ public static void transferFromFileTo(File file, OutputStream os) { } } + /** + * GZIP a File and return the new File descriptor. + */ + public static File gzipFile(File file) { + try { + var gzippedFile = createScratchFile(); + var gzippedOs = new GZIPOutputStream(getOutputStream(gzippedFile)); + transferFromFileTo(file, gzippedOs); + return gzippedFile; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Get an BufferedInputStream for a file. Read bytes from the underlying file stream without causing a system call * for each byte read. diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java index c972057f8..11010ffbd 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java @@ -2,6 +2,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.WebMercatorExtents; +import com.google.common.base.Preconditions; /** * Represents a task to be performed as part of a regional analysis. @@ -105,4 +106,13 @@ public int nTargetsPerOrigin () { } } + public int getTasksTotal() { + if (originPointSetKey != null) { + Preconditions.checkNotNull(originPointSet); + return originPointSet.featureCount(); + } else { + return width * height; + } + } + }