diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 1eeb93641..2aea67dc2 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -48,36 +48,41 @@ /** * This class distributes the tasks making up regional jobs to workers. *

- * It should aim to draw tasks fairly from all organizations, and fairly from all jobs within each - * organization, while attempting to respect the transport network affinity of each worker, giving - * the worker tasks that require the same network it has been using recently. + * It respects the declared transport network affinity of each worker, giving the worker tasks that + * relate to the same network it has been using recently, and is therefore already loaded in memory. *

- * Previously workers long-polled for work, holding lots of connections open. Now they short-poll - * and sleep for a while if there's no work. This is simpler and allows us to work withing much more - * standard HTTP frameworks. + * In our initial design, workers long-polled for work, holding lots of connections open. This was + * soon revised to short-poll and sleep for a while when there's no work. This was found to be + * simpler, allowing use of standard HTTP frameworks instead of custom networking code. *

- * The fact that workers continuously re-poll for work every 10-30 seconds serves as a signal to the - * broker that they are still alive and waiting. This also allows the broker to maintain a catalog - * of active workers. + * Issue conveyal/r5#596 arose because we'd only poll when a worker was running low on tasks or + * idling. A worker could disappear for a long time, leaving the backend to assume it had shut down + * or crashed. Workers were revised to poll more frequently even when they were busy and didn't + * need any new tasks to work on, providing a signal to the broker that they are still alive and + * functioning. This allows the broker to maintain a more accurate catalog of active workers. *

- * Because (at least currently) two organizations never share the same graph, we can get by with - * pulling tasks cyclically or randomly from all the jobs, and actively shape the number of workers - * with affinity for each graph by forcing some of them to accept tasks on graphs other than the one - * they have declared affinity for. + * Most methods on this class are synchronized because they can be called from many HTTP handler + * threads at once (when many workers are polling at once). We should occasionally evaluate whether + * synchronizing all methods to make this threadsafe is a performance issue. If so, fine-grained + * locking may be advantageous, but as a rule it is much harder to design, test, and maintain. *

- * This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal + * Workers were originally intended to migrate from one network to another to handle subsequent jobs + * without waiting for more cloud compute instances to start up. In practice we currently assign + * each worker a single network, but the balance of workers assigned to each network and the reuse + * of workers could in principle be made more sophisticated. The remainder of the comments below + * provide context for how this could be refined or improved. + * + * Because (at least currently) two organizations never share the same graph, we could get by with + * pulling tasks cyclically or randomly from all the jobs, and could actively shape the number of + * workers with affinity for each graph by forcing some of them to accept tasks on graphs other than + * the one they have declared affinity for. If the pool of workers was allowed to grow very large, + * we could aim to draw tasks fairly from all organizations, and fairly from all jobs within each + * organization. + *

+ * We have described this approach as "affinity homeostasis": constantly keep track of the ideal * proportion of workers by graph (based on active jobs), and the true proportion of consumers by * graph (based on incoming polling), then we can decide when a worker's graph affinity should be - * ignored and what it should be forced to. - *

- * It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job - * is serviced, it is taken out of the queue and put at its end. Jobs that have not been serviced - * float to the top. - *

- * Most methods on this class are synchronized, because they can be called from many HTTP handler - * threads at once. - * - * TODO evaluate whether synchronizing all methods to make this threadsafe is a performance issue. + * ignored and what graph it should be forced to. */ public class Broker implements Component { @@ -100,7 +105,15 @@ public interface Config { private final ListMultimap jobs = MultimapBuilder.hashKeys().arrayListValues().build(); - /** The most tasks to deliver to a worker at a time. */ + /** + * The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should + * never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value + * is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other + * workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete. + * + * The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic + * polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work. + */ public final int MAX_TASKS_PER_WORKER = 16; /** @@ -317,9 +330,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT /** * Attempt to find some tasks that match what a worker is requesting. - * Always returns a list, which may be empty if there is nothing to deliver. + * Always returns a non-null List, which may be empty if there is nothing to deliver. + * Number of tasks in the list is strictly limited to maxTasksRequested. */ - public synchronized List getSomeWork (WorkerCategory workerCategory) { + public synchronized List getSomeWork (WorkerCategory workerCategory, int maxTasksRequested) { + if (maxTasksRequested <= 0) { + return Collections.EMPTY_LIST; + } Job job; if (config.offline()) { // Working in offline mode; get tasks from the first job that has any tasks to deliver. @@ -335,7 +352,10 @@ public synchronized List getSomeWork (WorkerCategory workerCategor return Collections.EMPTY_LIST; } // Return up to N tasks that are waiting to be processed. - return job.generateSomeTasksToDeliver(MAX_TASKS_PER_WORKER); + if (maxTasksRequested > MAX_TASKS_PER_WORKER) { + maxTasksRequested = MAX_TASKS_PER_WORKER; + } + return job.generateSomeTasksToDeliver(maxTasksRequested); } /** diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index fa2c2ca66..e49cf40a9 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -177,7 +177,7 @@ public boolean isErrored () { /** * @param maxTasks the maximum number of tasks to return. * @return some tasks that are not yet marked as completed and have not yet been delivered in - * this delivery pass. + * this delivery pass. May return an empty list, but never null. */ public List generateSomeTasksToDeliver (int maxTasks) { List tasks = new ArrayList<>(maxTasks); diff --git a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java index c91ca7ae8..18ed6534f 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java +++ b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java @@ -1,6 +1,7 @@ package com.conveyal.analysis.components.broker; import com.conveyal.r5.analyst.WorkerCategory; +import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.WorkerStatus; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -21,7 +22,8 @@ */ public class WorkerCatalog { - public static final int WORKER_RECORD_DURATION_MSEC = 2 * 60 * 1000; + /** If a worker says it will poll every s seconds or less, wait s plus this number before considering it gone. */ + private static final int POLLING_TOLERANCE_SECONDS = 5; /** * The information supplied by workers the last time they polled for more tasks. @@ -74,9 +76,9 @@ public synchronized void catalog (WorkerStatus workerStatus) { */ private synchronized void purgeDeadWorkers () { long now = System.currentTimeMillis(); - long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC; List ancientObservations = new ArrayList<>(); for (WorkerObservation observation : observationsByWorkerId.values()) { + long oldestAcceptable = now - ((observation.status.pollIntervalSeconds + POLLING_TOLERANCE_SECONDS) * 1000); if (observation.lastSeen < oldestAcceptable) { ancientObservations.add(observation); } diff --git a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java index 409180fef..9deb1a882 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java @@ -14,6 +14,7 @@ import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; +import com.conveyal.gtfs.util.Util; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.RegionalTask; @@ -45,6 +46,7 @@ import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -52,6 +54,7 @@ import java.util.Map; import java.util.UUID; +import static com.conveyal.r5.common.Util.human; import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.google.common.base.Preconditions.checkNotNull; @@ -289,7 +292,8 @@ private String jsonResponse (Response response, int statusCode, Object object) { // We could call response.body(jsonMapper.writeValueAsBytes(object)); // but then the calling handler functions need to explicitly return null which is weird. try { - return jsonMapper.writeValueAsString(object); + String body = jsonMapper.writeValueAsString(object); + return body; } catch (JsonProcessingException e) { throw AnalysisServerException.unknown(e); } @@ -362,7 +366,7 @@ private Object workerPoll (Request request, Response response) { broker.recordWorkerObservation(workerStatus); WorkerCategory workerCategory = workerStatus.getWorkerCategory(); // See if any appropriate tasks exist for this worker. - List tasks = broker.getSomeWork(workerCategory); + List tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested); // If there is no work for the worker, signal this clearly with a "no content" code, // so the worker can sleep a while before the next polling attempt. if (tasks.isEmpty()) { diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index 2c35dac06..2add61702 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -76,8 +76,10 @@ public interface Config { private static final Logger LOG = LoggerFactory.getLogger(AnalysisWorker.class); - public static final int POLL_WAIT_SECONDS = 15; - public static final int POLL_MAX_RANDOM_WAIT = 5; + private static final int POLL_INTERVAL_MIN_SECONDS = 1; + private static final int POLL_INTERVAL_MAX_SECONDS = 15; + private static final int POLL_JITTER_SECONDS = 5; + private static final int QUEUE_SLOTS_PER_PROCESSOR = 8; /** * This timeout should be longer than the longest expected worker calculation for a single-point request. @@ -112,8 +114,6 @@ public interface Config { /** Keeps some TransportNetworks around, lazy-loading or lazy-building them. */ public final NetworkPreloader networkPreloader; - private final Random random = new Random(); - /** The common root of all API URLs contacted by this worker, e.g. http://localhost:7070/api/ */ protected final String brokerBaseUrl; @@ -127,8 +127,11 @@ public interface Config { */ private final List workResults = new ArrayList<>(); - /** The last time (in milliseconds since the epoch) that we polled for work. */ - private long lastPollingTime; + /** + * The last time (in milliseconds since the epoch) that we polled for work. + * The initial value of zero causes the worker to poll the backend immediately on startup avoiding a delay. + */ + private long lastPollingTime = 0; /** Keep track of how many tasks per minute this worker is processing, broken down by scenario ID. */ private final ThroughputTracker throughputTracker = new ThroughputTracker(); @@ -163,7 +166,7 @@ public static HttpClient makeHttpClient () { /** * A queue to hold a backlog of regional analysis tasks. - * This avoids "slow joiner" syndrome where we wait to poll for more work until all N fetched tasks have finished, + * This avoids "slow joiner" syndrome where we wait until all N fetched tasks have finished, * but one of the tasks takes much longer than all the rest. * This should be long enough to hold all that have come in - we don't need to block on polling the manager. * Can this be replaced with the general purpose TaskScheduler component? @@ -200,66 +203,100 @@ public AnalysisWorker ( /** The main worker event loop which fetches tasks from a broker and schedules them for execution. */ public void startPolling () { - // Create executors with up to one thread per processor. - // The default task rejection policy is "Abort". - // The executor's queue is rather long because some tasks complete very fast and we poll max once per second. + // Create an executor with one thread per processor. The default task rejection policy is "Abort". + // The number of threads will only increase from the core pool size toward the max pool size when the queue is + // full. We no longer exceed the queue length in normal operation, so the thread pool will remain at core size. + // "[The] core pool size is the threshold beyond which [an] executor service prefers to queue up the task + // [rather] than spawn a new thread." https://stackoverflow.com/a/72684387/778449 + // The executor's queue is rather long because some tasks complete very fast and we poll at most once per second. int availableProcessors = Runtime.getRuntime().availableProcessors(); - LOG.debug("Java reports the number of available processors is: {}", availableProcessors); - int maxThreads = availableProcessors; - int taskQueueLength = availableProcessors * 6; - LOG.debug("Maximum number of regional processing threads is {}, length of task queue is {}.", maxThreads, taskQueueLength); + LOG.info("Java reports the number of available processors is: {}", availableProcessors); + final int maxThreads = availableProcessors; + final int taskQueueLength = availableProcessors * QUEUE_SLOTS_PER_PROCESSOR; + LOG.info("Maximum number of regional processing threads is {}, target length of task queue is {}.", maxThreads, taskQueueLength); BlockingQueue taskQueue = new LinkedBlockingQueue<>(taskQueueLength); - regionalTaskExecutor = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, taskQueue); + regionalTaskExecutor = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, taskQueue); - // Main polling loop to fill the regional work queue. + // This is the main polling loop that fills the regional work queue. // Go into an endless loop polling for regional tasks that can be computed asynchronously. // You'd think the ThreadPoolExecutor could just block when the blocking queue is full, but apparently // people all over the world have been jumping through hoops to try to achieve this simple behavior // with no real success, at least without writing bug and deadlock-prone custom executor services. - // Two alternative approaches are trying to keep the queue full and waiting for the queue to be almost empty. - // To keep the queue full, we repeatedly try to add each task to the queue, pausing and retrying when - // it's full. To wait until it's almost empty, we could use wait() in a loop and notify() as tasks are handled. - // see https://stackoverflow.com/a/15185004/778449 - // A simpler approach might be to spin-wait checking whether the queue is low and sleeping briefly, - // then fetch more work only when the queue is getting empty. + // Our current (revised) approach is to slowly spin-wait, checking whether the queue is low and sleeping briefly, + // then fetching more work only when the queue is getting empty. + + // Allows slowing down polling when not actively receiving tasks. + boolean receivedWorkLastTime = false; + + // Before first polling the broker, randomly wait a few seconds to spread load when many workers start at once. + sleepSeconds((new Random()).nextInt(POLL_JITTER_SECONDS)); while (true) { - List tasks = getSomeWork(); - if (tasks == null || tasks.isEmpty()) { - // Either there was no work, or some kind of error occurred. - // Sleep for a while before polling again, adding a random component to spread out the polling load. - // TODO only randomize delay on the first round, after that it's excessive. - int randomWait = random.nextInt(POLL_MAX_RANDOM_WAIT); - LOG.debug("Polling the broker did not yield any regional tasks. Sleeping {} + {} sec.", POLL_WAIT_SECONDS, randomWait); - sleepSeconds(POLL_WAIT_SECONDS + randomWait); + // We establish a lower limit on the wait time between polling to avoid flooding the broker with requests. + // If worker handles all tasks in its internal queue in less than this time, this is a speed bottleneck. + // This can happen in areas unconnected to transit and when travel time cutoffs are very low. + sleepSeconds(POLL_INTERVAL_MIN_SECONDS); + // Determine whether to poll this cycle - is the queue running empty, or has the maximum interval passed? + { + long currentTime = System.currentTimeMillis(); + boolean maxIntervalExceeded = (currentTime - lastPollingTime) > (POLL_INTERVAL_MAX_SECONDS * 1000); + int tasksInQueue = taskQueue.size(); + // Poll any time we have less tasks in the queue than processors. + boolean shouldPoll = maxIntervalExceeded || (receivedWorkLastTime && (tasksInQueue < availableProcessors)); + LOG.debug("Last polled {} sec ago. Task queue length is {}.", (currentTime - lastPollingTime)/1000, taskQueue.size()); + if (!shouldPoll) { + continue; + } + lastPollingTime = currentTime; + } + // This will request tasks even when queue is rather full. + // For now, assume more but smaller task and result chunks is better at leveling broker load. + int tasksToRequest = taskQueue.remainingCapacity(); + // Alternatively: Only request tasks when queue is short. Otherwise, only report results and status. + // int tasksToRequest = (tasksInQueue < minQueueLength) ? taskQueue.remainingCapacity() : 0; + + List tasks = getSomeWork(tasksToRequest); + boolean noWorkReceived = tasks == null || tasks.isEmpty(); + receivedWorkLastTime = !noWorkReceived; // Allows variable speed polling on the next iteration. + if (noWorkReceived) { + // Either the broker supplied no work or an error occurred. continue; } for (RegionalTask task : tasks) { - // Try to enqueue each task for execution, repeatedly failing until the queue is not full. - // The list of fetched tasks essentially serves as a secondary queue, which is awkward. This is using - // exceptions for normal flow control, which is nasty. We should do this differently (#596). - while (true) { - try { - // TODO define non-anonymous runnable class to instantiate here, specifically for async regional tasks. - regionalTaskExecutor.execute(() -> { - try { - this.handleOneRegionalTask(task); - } catch (Throwable t) { - LOG.error( - "An error occurred while handling a regional task, reporting to backend. {}", - ExceptionUtils.stackTraceString(t) - ); - synchronized (workResults) { - workResults.add(new RegionalWorkResult(t, task)); - } - } - }); - break; - } catch (RejectedExecutionException e) { - // Queue is full, wait a bit and try to feed it more tasks. If worker handles all tasks in its - // internal queue in less than 1 second, this is a speed bottleneck. This happens with regions - // unconnected to transit and with very small travel time cutoffs. - sleepSeconds(1); - } + // Executor services require blocking queues of fixed length. Tasks must be enqueued one by one, and + // may fail with a RejectedExecutionException if we exceed the queue length. We choose queue length + // and requested number of tasks carefully to avoid overfilling the queue, but should handle the + // exceptions just in case something is misconfigured. + try { + regionalTaskExecutor.execute(new RegionalTaskRunnable(task)); + } catch (RejectedExecutionException e) { + LOG.error("Regional task could not be enqueued for processing (queue length exceeded). Task dropped."); + } + } + } + } + + /** + * Runnable inner class which can access the needed methods on AnalysisWorker (handleOneRegionalTask). + * However that method is only called from these runnables - it could potentially be inlined into run(). + */ + protected class RegionalTaskRunnable implements Runnable { + RegionalTask task; + + public RegionalTaskRunnable(RegionalTask task) { + this.task = task; + } + + @Override + public void run() { + try { + handleOneRegionalTask(task); + } catch (Throwable t) { + LOG.error( + "An error occurred while handling a regional task, reporting to backend. {}", + ExceptionUtils.stackTraceString(t) + ); + synchronized (workResults) { + workResults.add(new RegionalWorkResult(t, task)); } } } @@ -551,10 +588,13 @@ public static void addJsonToGrid ( * Also returns any accumulated work results to the backend. * @return a list of work tasks, or null if there was no work to do, or if no work could be fetched. */ - public List getSomeWork () { + public List getSomeWork (int tasksToRequest) { + LOG.debug("Polling backend to report status and request up to {} tasks.", tasksToRequest); String url = brokerBaseUrl + "/poll"; HttpPost httpPost = new HttpPost(url); WorkerStatus workerStatus = new WorkerStatus(this); + workerStatus.maxTasksRequested = tasksToRequest; + workerStatus.pollIntervalSeconds = POLL_INTERVAL_MAX_SECONDS; // Include all completed work results when polling the backend. // Atomically copy and clear the accumulated work results, while blocking writes from other threads. synchronized (workResults) { @@ -564,7 +604,7 @@ public List getSomeWork () { // Compute throughput in tasks per minute and include it in the worker status report. // We poll too frequently to compute throughput just since the last poll operation. - // TODO reduce polling frequency (larger queue in worker), compute shorter-term throughput. + // We may want to reduce polling frequency (with larger queue in worker) and compute shorter-term throughput. workerStatus.tasksPerMinuteByJobId = throughputTracker.getTasksPerMinuteByJobId(); // Report how often we're polling for work, just for monitoring. @@ -585,8 +625,10 @@ public List getSomeWork () { // Broker returned some work. Use the lenient object mapper to decode it in case the broker is a // newer version so sending unrecognizable fields. // ReadValue closes the stream, releasing the HTTP connection. - return JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), - new TypeReference>() {}); + return JsonUtilities.lenientObjectMapper.readValue( + responseEntity.getContent(), + new TypeReference>() {} + ); } // Non-200 response code or a null entity. Something is weird. LOG.error("Unsuccessful polling. HTTP response code: " + response.getStatusLine().getStatusCode()); diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 7d002fa9a..1fcd17a6e 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -24,6 +24,8 @@ public class WorkerStatus { private static final Logger LOG = LoggerFactory.getLogger(WorkerStatus.class); + private static final int LEGACY_WORKER_MAX_TASKS = 16; + public static final int LEGACY_WORKER_MAX_POLL_INTERVAL_SECONDS = 2 * 60; public String architecture; public int processors; @@ -50,6 +52,20 @@ public class WorkerStatus { public String ipAddress; public List results; + /** + * Then maximum number of tasks the broker should send to this worker. May be zero if its work queue is full. + * Default value determines the number of tasks to send to older workers that don't send this value when they poll. + */ + public int maxTasksRequested = LEGACY_WORKER_MAX_TASKS; + + /** + * The maximum amount of time the worker will wait before polling again. After this much time passes the backend + * may consider the worker lost or shut down. The backend should be somewhat lenient here as there could be delays + * due to connection setup, API contention etc. The default value reflects how how long the backend should wait to + * hear from older workers that don't send this value. + */ + public int pollIntervalSeconds = LEGACY_WORKER_MAX_POLL_INTERVAL_SECONDS; + /** No-arg constructor used when deserializing. */ public WorkerStatus() { } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 00e0cdabf..34bfac76c 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -22,4 +22,7 @@ + + + \ No newline at end of file