Skip to content

Commit

Permalink
Merge branch 'dev' into jdk17
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd authored Oct 19, 2023
2 parents 46a0434 + 4321da8 commit ff7252f
Show file tree
Hide file tree
Showing 38 changed files with 1,148 additions and 132 deletions.
76 changes: 48 additions & 28 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,41 @@
/**
* This class distributes the tasks making up regional jobs to workers.
* <p>
* It should aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization, while attempting to respect the transport network affinity of each worker, giving
* the worker tasks that require the same network it has been using recently.
* It respects the declared transport network affinity of each worker, giving the worker tasks that
* relate to the same network it has been using recently, and is therefore already loaded in memory.
* <p>
* Previously workers long-polled for work, holding lots of connections open. Now they short-poll
* and sleep for a while if there's no work. This is simpler and allows us to work withing much more
* standard HTTP frameworks.
* In our initial design, workers long-polled for work, holding lots of connections open. This was
* soon revised to short-poll and sleep for a while when there's no work. This was found to be
* simpler, allowing use of standard HTTP frameworks instead of custom networking code.
* <p>
* The fact that workers continuously re-poll for work every 10-30 seconds serves as a signal to the
* broker that they are still alive and waiting. This also allows the broker to maintain a catalog
* of active workers.
* Issue conveyal/r5#596 arose because we'd only poll when a worker was running low on tasks or
* idling. A worker could disappear for a long time, leaving the backend to assume it had shut down
* or crashed. Workers were revised to poll more frequently even when they were busy and didn't
* need any new tasks to work on, providing a signal to the broker that they are still alive and
* functioning. This allows the broker to maintain a more accurate catalog of active workers.
* <p>
* Because (at least currently) two organizations never share the same graph, we can get by with
* pulling tasks cyclically or randomly from all the jobs, and actively shape the number of workers
* with affinity for each graph by forcing some of them to accept tasks on graphs other than the one
* they have declared affinity for.
* Most methods on this class are synchronized because they can be called from many HTTP handler
* threads at once (when many workers are polling at once). We should occasionally evaluate whether
* synchronizing all methods to make this threadsafe is a performance issue. If so, fine-grained
* locking may be advantageous, but as a rule it is much harder to design, test, and maintain.
* <p>
* This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal
* Workers were originally intended to migrate from one network to another to handle subsequent jobs
* without waiting for more cloud compute instances to start up. In practice we currently assign
* each worker a single network, but the balance of workers assigned to each network and the reuse
* of workers could in principle be made more sophisticated. The remainder of the comments below
* provide context for how this could be refined or improved.
*
* Because (at least currently) two organizations never share the same graph, we could get by with
* pulling tasks cyclically or randomly from all the jobs, and could actively shape the number of
* workers with affinity for each graph by forcing some of them to accept tasks on graphs other than
* the one they have declared affinity for. If the pool of workers was allowed to grow very large,
* we could aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization.
* <p>
* We have described this approach as "affinity homeostasis": constantly keep track of the ideal
* proportion of workers by graph (based on active jobs), and the true proportion of consumers by
* graph (based on incoming polling), then we can decide when a worker's graph affinity should be
* ignored and what it should be forced to.
* <p>
* It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job
* is serviced, it is taken out of the queue and put at its end. Jobs that have not been serviced
* float to the top.
* <p>
* Most methods on this class are synchronized, because they can be called from many HTTP handler
* threads at once.
*
* TODO evaluate whether synchronizing all methods to make this threadsafe is a performance issue.
* ignored and what graph it should be forced to.
*/
public class Broker implements Component {

Expand All @@ -100,7 +105,15 @@ public interface Config {
private final ListMultimap<WorkerCategory, Job> jobs =
MultimapBuilder.hashKeys().arrayListValues().build();

/** The most tasks to deliver to a worker at a time. */
/**
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*/
public final int MAX_TASKS_PER_WORKER = 16;

/**
Expand Down Expand Up @@ -317,9 +330,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT

/**
* Attempt to find some tasks that match what a worker is requesting.
* Always returns a list, which may be empty if there is nothing to deliver.
* Always returns a non-null List, which may be empty if there is nothing to deliver.
* Number of tasks in the list is strictly limited to maxTasksRequested.
*/
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory) {
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory, int maxTasksRequested) {
if (maxTasksRequested <= 0) {
return Collections.EMPTY_LIST;
}
Job job;
if (config.offline()) {
// Working in offline mode; get tasks from the first job that has any tasks to deliver.
Expand All @@ -335,7 +352,10 @@ public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategor
return Collections.EMPTY_LIST;
}
// Return up to N tasks that are waiting to be processed.
return job.generateSomeTasksToDeliver(MAX_TASKS_PER_WORKER);
if (maxTasksRequested > MAX_TASKS_PER_WORKER) {
maxTasksRequested = MAX_TASKS_PER_WORKER;
}
return job.generateSomeTasksToDeliver(maxTasksRequested);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public boolean isErrored () {
/**
* @param maxTasks the maximum number of tasks to return.
* @return some tasks that are not yet marked as completed and have not yet been delivered in
* this delivery pass.
* this delivery pass. May return an empty list, but never null.
*/
public List<RegionalTask> generateSomeTasksToDeliver (int maxTasks) {
List<RegionalTask> tasks = new ArrayList<>(maxTasks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.conveyal.analysis.components.broker;

import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.WorkerStatus;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -21,7 +22,8 @@
*/
public class WorkerCatalog {

public static final int WORKER_RECORD_DURATION_MSEC = 2 * 60 * 1000;
/** If a worker says it will poll every s seconds or less, wait s plus this number before considering it gone. */
private static final int POLLING_TOLERANCE_SECONDS = 5;

/**
* The information supplied by workers the last time they polled for more tasks.
Expand Down Expand Up @@ -74,9 +76,9 @@ public synchronized void catalog (WorkerStatus workerStatus) {
*/
private synchronized void purgeDeadWorkers () {
long now = System.currentTimeMillis();
long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC;
List<WorkerObservation> ancientObservations = new ArrayList<>();
for (WorkerObservation observation : observationsByWorkerId.values()) {
long oldestAcceptable = now - ((observation.status.pollIntervalSeconds + POLLING_TOLERANCE_SECONDS) * 1000);
if (observation.lastSeen < oldestAcceptable) {
ancientObservations.add(observation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.gtfs.util.Util;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand Down Expand Up @@ -45,13 +46,15 @@

import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.conveyal.r5.common.Util.human;
import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -289,7 +292,8 @@ private String jsonResponse (Response response, int statusCode, Object object) {
// We could call response.body(jsonMapper.writeValueAsBytes(object));
// but then the calling handler functions need to explicitly return null which is weird.
try {
return jsonMapper.writeValueAsString(object);
String body = jsonMapper.writeValueAsString(object);
return body;
} catch (JsonProcessingException e) {
throw AnalysisServerException.unknown(e);
}
Expand Down Expand Up @@ -362,7 +366,7 @@ private Object workerPoll (Request request, Response response) {
broker.recordWorkerObservation(workerStatus);
WorkerCategory workerCategory = workerStatus.getWorkerCategory();
// See if any appropriate tasks exist for this worker.
List<RegionalTask> tasks = broker.getSomeWork(workerCategory);
List<RegionalTask> tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested);
// If there is no work for the worker, signal this clearly with a "no content" code,
// so the worker can sleep a while before the next polling attempt.
if (tasks.isEmpty()) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/conveyal/analysis/models/AnalysisRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ public class AnalysisRequest {
*/
public ChaosParameters injectFault;

/**
* Whether to include the number of opportunities reached during each minute of travel in results sent back
* to the broker. Requires both an origin and destination pointset to be specified, and in the case of regional
* analyses the origins must be non-gridded, and results will be collated to CSV.
* It should be possible to enable regional results for gridded origins as well.
*/
public boolean includeTemporalDensity = false;

/**
* If this is set to a value above zero, report the amount of time needed to reach the given number of
* opportunities from this origin (known technically as "dual accessibility").
*/
public int dualAccessibilityThreshold = 0;


/**
* Create the R5 `Scenario` from this request.
*/
Expand Down Expand Up @@ -265,6 +280,9 @@ public void populateTask (AnalysisWorkerTask task, UserPermissions userPermissio
throw new IllegalArgumentException("Must be admin user to inject faults.");
}
}

task.includeTemporalDensity = includeTemporalDensity;
task.dualAccessibilityThreshold = dualAccessibilityThreshold;
}

private EnumSet<LegMode> getEnumSetFromString (String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ public static class FrequencyEntry extends AbstractTimetable {
/** trip from which to copy travel times */
public String sourceTrip;

/** trips on the selected patterns which could be used as source trips */
public String[] patternTrips;

public AddTrips.PatternTimetable toR5 (String feed) {
AddTrips.PatternTimetable pt = toBaseR5Timetable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
* do serve to enumerate the acceptable parameters coming over the HTTP API.
*/
public enum CsvResultType {
ACCESS, TIMES, PATHS
ACCESS, TIMES, PATHS, TDENSITY
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.io.FileWriter;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

/**
Expand Down Expand Up @@ -55,6 +57,7 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region
*/
CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException {
super(fileStorage);
checkArgument(task.originPointSet != null, "CsvResultWriters require FreeFormPointSet origins.");
super.prepare(task.jobId);
this.fileName = task.jobId + "_" + resultType() +".csv";
BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto

if (job.templateTask.recordAccessibility) {
if (job.templateTask.originPointSet != null) {
// Freeform origins - create CSV regional analysis results
resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage));
} else {
// Gridded origins - create gridded regional analysis results
resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage));
}
}
Expand All @@ -123,6 +125,20 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
resultWriters.add(new PathCsvResultWriter(job.templateTask, fileStorage));
}

if (job.templateTask.includeTemporalDensity) {
if (job.templateTask.originPointSet == null) {
// Gridded origins. The full temporal density information is probably too voluminous to be useful.
// We might want to record a grid of dual accessibility values, but this will require some serious
// refactoring of the GridResultWriter.
// if (job.templateTask.dualAccessibilityThreshold > 0) { ... }
throw new IllegalArgumentException("Temporal density of opportunities cannot be recorded for gridded origin points.");
} else {
// Freeform origins.
// Output includes temporal density of opportunities and optionally dual accessibility.
resultWriters.add(new TemporalDensityCsvResultWriter(job.templateTask, fileStorage));
}
}

checkArgument(job.templateTask.makeTauiSite || notNullOrEmpty(resultWriters),
"A non-Taui regional analysis should always create at least one grid or CSV file.");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileStorage;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This handles collating regional results into CSV files containing temporal opportunity density
* (number of opportunities reached in each one-minute interval, the derivative of step-function accessibility)
* as well as "dual" accessibility (the amount of time needed to reach n opportunities).
*/
public class TemporalDensityCsvResultWriter extends CsvResultWriter {

private final int dualThreshold;

public TemporalDensityCsvResultWriter(RegionalTask task, FileStorage fileStorage) throws IOException {
super(task, fileStorage);
dualThreshold = task.dualAccessibilityThreshold;
}

@Override
public CsvResultType resultType () {
return CsvResultType.TDENSITY;
}

@Override
public String[] columnHeaders () {
List<String> headers = new ArrayList<>();
// The ids of the freeform origin point and destination set
headers.add("originId");
headers.add("destId");
headers.add("percentile");
for (int m = 0; m < 120; m += 1) {
// The opportunity density over travel minute m
headers.add(Integer.toString(m));
}
// The number of minutes needed to reach d destination opportunities
headers.add("D" + dualThreshold);
return headers.toArray(new String[0]);
}

@Override
protected void checkDimension (RegionalWorkResult workResult) {
checkDimension(
workResult, "destination pointsets",
workResult.opportunitiesPerMinute.length, task.destinationPointSetKeys.length
);
for (double[][] percentilesForPointset : workResult.opportunitiesPerMinute) {
checkDimension(workResult, "percentiles", percentilesForPointset.length, task.percentiles.length);
for (double[] minutesForPercentile : percentilesForPointset) {
checkDimension(workResult, "minutes", minutesForPercentile.length, 120);
}
}
}

@Override
public Iterable<String[]> rowValues (RegionalWorkResult workResult) {
List<String[]> rows = new ArrayList<>();
String originId = task.originPointSet.getId(workResult.taskId);
for (int d = 0; d < task.destinationPointSetKeys.length; d++) {
double[][] percentilesForDestPointset = workResult.opportunitiesPerMinute[d];
for (int p = 0; p < task.percentiles.length; p++) {
List<String> row = new ArrayList<>(125);
row.add(originId);
row.add(task.destinationPointSetKeys[d]);
row.add(Integer.toString(p));
// One density value for each of 120 minutes
double[] densitiesPerMinute = percentilesForDestPointset[p];
for (int m = 0; m < 120; m++) {
row.add(Double.toString(densitiesPerMinute[m]));
}
// One dual accessibility value
int m = 0;
double sum = 0;
while (sum < dualThreshold && m < 120) {
sum += densitiesPerMinute[m];
m += 1;
}
row.add(Integer.toString(m >= 120 ? -1 : m));
rows.add(row.toArray(new String[row.size()]));
}
}
return rows;
}

}
Loading

0 comments on commit ff7252f

Please sign in to comment.