Skip to content

Commit

Permalink
Merge pull request #874 from conveyal/regular-polling
Browse files Browse the repository at this point in the history
Regular Polling
  • Loading branch information
abyrd authored Sep 28, 2023
2 parents fbb3f66 + c77d0a2 commit 00e6c8e
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 93 deletions.
76 changes: 48 additions & 28 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,41 @@
/**
* This class distributes the tasks making up regional jobs to workers.
* <p>
* It should aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization, while attempting to respect the transport network affinity of each worker, giving
* the worker tasks that require the same network it has been using recently.
* It respects the declared transport network affinity of each worker, giving the worker tasks that
* relate to the same network it has been using recently, and is therefore already loaded in memory.
* <p>
* Previously workers long-polled for work, holding lots of connections open. Now they short-poll
* and sleep for a while if there's no work. This is simpler and allows us to work withing much more
* standard HTTP frameworks.
* In our initial design, workers long-polled for work, holding lots of connections open. This was
* soon revised to short-poll and sleep for a while when there's no work. This was found to be
* simpler, allowing use of standard HTTP frameworks instead of custom networking code.
* <p>
* The fact that workers continuously re-poll for work every 10-30 seconds serves as a signal to the
* broker that they are still alive and waiting. This also allows the broker to maintain a catalog
* of active workers.
* Issue conveyal/r5#596 arose because we'd only poll when a worker was running low on tasks or
* idling. A worker could disappear for a long time, leaving the backend to assume it had shut down
* or crashed. Workers were revised to poll more frequently even when they were busy and didn't
* need any new tasks to work on, providing a signal to the broker that they are still alive and
* functioning. This allows the broker to maintain a more accurate catalog of active workers.
* <p>
* Because (at least currently) two organizations never share the same graph, we can get by with
* pulling tasks cyclically or randomly from all the jobs, and actively shape the number of workers
* with affinity for each graph by forcing some of them to accept tasks on graphs other than the one
* they have declared affinity for.
* Most methods on this class are synchronized because they can be called from many HTTP handler
* threads at once (when many workers are polling at once). We should occasionally evaluate whether
* synchronizing all methods to make this threadsafe is a performance issue. If so, fine-grained
* locking may be advantageous, but as a rule it is much harder to design, test, and maintain.
* <p>
* This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal
* Workers were originally intended to migrate from one network to another to handle subsequent jobs
* without waiting for more cloud compute instances to start up. In practice we currently assign
* each worker a single network, but the balance of workers assigned to each network and the reuse
* of workers could in principle be made more sophisticated. The remainder of the comments below
* provide context for how this could be refined or improved.
*
* Because (at least currently) two organizations never share the same graph, we could get by with
* pulling tasks cyclically or randomly from all the jobs, and could actively shape the number of
* workers with affinity for each graph by forcing some of them to accept tasks on graphs other than
* the one they have declared affinity for. If the pool of workers was allowed to grow very large,
* we could aim to draw tasks fairly from all organizations, and fairly from all jobs within each
* organization.
* <p>
* We have described this approach as "affinity homeostasis": constantly keep track of the ideal
* proportion of workers by graph (based on active jobs), and the true proportion of consumers by
* graph (based on incoming polling), then we can decide when a worker's graph affinity should be
* ignored and what it should be forced to.
* <p>
* It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job
* is serviced, it is taken out of the queue and put at its end. Jobs that have not been serviced
* float to the top.
* <p>
* Most methods on this class are synchronized, because they can be called from many HTTP handler
* threads at once.
*
* TODO evaluate whether synchronizing all methods to make this threadsafe is a performance issue.
* ignored and what graph it should be forced to.
*/
public class Broker implements Component {

Expand All @@ -100,7 +105,15 @@ public interface Config {
private final ListMultimap<WorkerCategory, Job> jobs =
MultimapBuilder.hashKeys().arrayListValues().build();

/** The most tasks to deliver to a worker at a time. */
/**
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*/
public final int MAX_TASKS_PER_WORKER = 16;

/**
Expand Down Expand Up @@ -317,9 +330,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT

/**
* Attempt to find some tasks that match what a worker is requesting.
* Always returns a list, which may be empty if there is nothing to deliver.
* Always returns a non-null List, which may be empty if there is nothing to deliver.
* Number of tasks in the list is strictly limited to maxTasksRequested.
*/
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory) {
public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategory, int maxTasksRequested) {
if (maxTasksRequested <= 0) {
return Collections.EMPTY_LIST;
}
Job job;
if (config.offline()) {
// Working in offline mode; get tasks from the first job that has any tasks to deliver.
Expand All @@ -335,7 +352,10 @@ public synchronized List<RegionalTask> getSomeWork (WorkerCategory workerCategor
return Collections.EMPTY_LIST;
}
// Return up to N tasks that are waiting to be processed.
return job.generateSomeTasksToDeliver(MAX_TASKS_PER_WORKER);
if (maxTasksRequested > MAX_TASKS_PER_WORKER) {
maxTasksRequested = MAX_TASKS_PER_WORKER;
}
return job.generateSomeTasksToDeliver(maxTasksRequested);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public boolean isErrored () {
/**
* @param maxTasks the maximum number of tasks to return.
* @return some tasks that are not yet marked as completed and have not yet been delivered in
* this delivery pass.
* this delivery pass. May return an empty list, but never null.
*/
public List<RegionalTask> generateSomeTasksToDeliver (int maxTasks) {
List<RegionalTask> tasks = new ArrayList<>(maxTasks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.conveyal.analysis.components.broker;

import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.WorkerStatus;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -21,7 +22,8 @@
*/
public class WorkerCatalog {

public static final int WORKER_RECORD_DURATION_MSEC = 2 * 60 * 1000;
/** If a worker says it will poll every s seconds or less, wait s plus this number before considering it gone. */
private static final int POLLING_TOLERANCE_SECONDS = 5;

/**
* The information supplied by workers the last time they polled for more tasks.
Expand Down Expand Up @@ -74,9 +76,9 @@ public synchronized void catalog (WorkerStatus workerStatus) {
*/
private synchronized void purgeDeadWorkers () {
long now = System.currentTimeMillis();
long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC;
List<WorkerObservation> ancientObservations = new ArrayList<>();
for (WorkerObservation observation : observationsByWorkerId.values()) {
long oldestAcceptable = now - ((observation.status.pollIntervalSeconds + POLLING_TOLERANCE_SECONDS) * 1000);
if (observation.lastSeen < oldestAcceptable) {
ancientObservations.add(observation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.gtfs.util.Util;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand Down Expand Up @@ -45,13 +46,15 @@

import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.conveyal.r5.common.Util.human;
import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -289,7 +292,8 @@ private String jsonResponse (Response response, int statusCode, Object object) {
// We could call response.body(jsonMapper.writeValueAsBytes(object));
// but then the calling handler functions need to explicitly return null which is weird.
try {
return jsonMapper.writeValueAsString(object);
String body = jsonMapper.writeValueAsString(object);
return body;
} catch (JsonProcessingException e) {
throw AnalysisServerException.unknown(e);
}
Expand Down Expand Up @@ -362,7 +366,7 @@ private Object workerPoll (Request request, Response response) {
broker.recordWorkerObservation(workerStatus);
WorkerCategory workerCategory = workerStatus.getWorkerCategory();
// See if any appropriate tasks exist for this worker.
List<RegionalTask> tasks = broker.getSomeWork(workerCategory);
List<RegionalTask> tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested);
// If there is no work for the worker, signal this clearly with a "no content" code,
// so the worker can sleep a while before the next polling attempt.
if (tasks.isEmpty()) {
Expand Down
Loading

0 comments on commit 00e6c8e

Please sign in to comment.