From 9c6e3530eb4613608b9183b6a28cda11d01ee093 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Thu, 16 Mar 2023 22:59:21 +0800 Subject: [PATCH 1/7] maximum wait of 15 seconds between worker polling --- .../analysis/components/broker/Broker.java | 18 ++- .../analysis/components/broker/Job.java | 2 +- .../components/broker/WorkerCatalog.java | 8 ++ .../controllers/BrokerController.java | 8 +- .../r5/analyst/cluster/AnalysisWorker.java | 128 ++++++++++++------ .../r5/analyst/cluster/WorkerStatus.java | 3 + 6 files changed, 114 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 1eeb93641..06dc9cdfa 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -100,8 +100,8 @@ public interface Config { private final ListMultimap jobs = MultimapBuilder.hashKeys().arrayListValues().build(); - /** The most tasks to deliver to a worker at a time. */ - public final int MAX_TASKS_PER_WORKER = 16; + /** The most tasks to deliver to a worker at a time. 50 tasks gives response bodies of about 65kB. */ + public final int MAX_TASKS_PER_WORKER = 50; /** * Used when auto-starting spot instances. Set to a smaller value to increase the number of @@ -317,9 +317,13 @@ public void createWorkersInCategory (WorkerCategory category, WorkerTags workerT /** * Attempt to find some tasks that match what a worker is requesting. - * Always returns a list, which may be empty if there is nothing to deliver. + * Always returns a non-null List, which may be empty if there is nothing to deliver. + * Number of tasks in the list is strictly limited to maxTasksRequested. */ - public synchronized List getSomeWork (WorkerCategory workerCategory) { + public synchronized List getSomeWork (WorkerCategory workerCategory, int maxTasksRequested) { + if (maxTasksRequested <= 0) { + return Collections.EMPTY_LIST; + } Job job; if (config.offline()) { // Working in offline mode; get tasks from the first job that has any tasks to deliver. @@ -335,7 +339,11 @@ public synchronized List getSomeWork (WorkerCategory workerCategor return Collections.EMPTY_LIST; } // Return up to N tasks that are waiting to be processed. - return job.generateSomeTasksToDeliver(MAX_TASKS_PER_WORKER); + if (maxTasksRequested > MAX_TASKS_PER_WORKER) { + LOG.warn("Worker requested {} tasks, reducing to {}.", maxTasksRequested, MAX_TASKS_PER_WORKER); + maxTasksRequested = MAX_TASKS_PER_WORKER; + } + return job.generateSomeTasksToDeliver(maxTasksRequested); } /** diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index fa2c2ca66..e49cf40a9 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -177,7 +177,7 @@ public boolean isErrored () { /** * @param maxTasks the maximum number of tasks to return. * @return some tasks that are not yet marked as completed and have not yet been delivered in - * this delivery pass. + * this delivery pass. May return an empty list, but never null. */ public List generateSomeTasksToDeliver (int maxTasks) { List tasks = new ArrayList<>(maxTasks); diff --git a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java index c91ca7ae8..a3410e758 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java +++ b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java @@ -1,6 +1,7 @@ package com.conveyal.analysis.components.broker; import com.conveyal.r5.analyst.WorkerCategory; +import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.WorkerStatus; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -74,6 +75,13 @@ public synchronized void catalog (WorkerStatus workerStatus) { */ private synchronized void purgeDeadWorkers () { long now = System.currentTimeMillis(); + // TODO purge newer workers much sooner since they poll regularly every 15 seconds. + // Unfortunately we need to keep the higher threshold around for older worker versions. + // Rather than decoding the worker version string, we can also just look at whether + // observation.status.maxTasksRequested > 0 which is only true on regularly-polling workers. + // Maybe the workers should even send their own poll interval in their status for this reason + // (to handle future tweaks across versions). This field could default to WORKER_RECORD_DURATION_MSEC. + long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC; List ancientObservations = new ArrayList<>(); for (WorkerObservation observation : observationsByWorkerId.values()) { diff --git a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java index 409180fef..9deb1a882 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java @@ -14,6 +14,7 @@ import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; +import com.conveyal.gtfs.util.Util; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.RegionalTask; @@ -45,6 +46,7 @@ import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -52,6 +54,7 @@ import java.util.Map; import java.util.UUID; +import static com.conveyal.r5.common.Util.human; import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.google.common.base.Preconditions.checkNotNull; @@ -289,7 +292,8 @@ private String jsonResponse (Response response, int statusCode, Object object) { // We could call response.body(jsonMapper.writeValueAsBytes(object)); // but then the calling handler functions need to explicitly return null which is weird. try { - return jsonMapper.writeValueAsString(object); + String body = jsonMapper.writeValueAsString(object); + return body; } catch (JsonProcessingException e) { throw AnalysisServerException.unknown(e); } @@ -362,7 +366,7 @@ private Object workerPoll (Request request, Response response) { broker.recordWorkerObservation(workerStatus); WorkerCategory workerCategory = workerStatus.getWorkerCategory(); // See if any appropriate tasks exist for this worker. - List tasks = broker.getSomeWork(workerCategory); + List tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested); // If there is no work for the worker, signal this clearly with a "no content" code, // so the worker can sleep a while before the next polling attempt. if (tasks.isEmpty()) { diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index aad7da99d..297c1bc1b 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -75,8 +75,10 @@ public interface Config { private static final Logger LOG = LoggerFactory.getLogger(AnalysisWorker.class); - public static final int POLL_WAIT_SECONDS = 15; - public static final int POLL_MAX_RANDOM_WAIT = 5; + private static final int POLL_INTERVAL_MIN_SECONDS = 1; + private static final int POLL_INTERVAL_MAX_SECONDS = 15; + private static final int POLL_JITTER_SECONDS = 5; + private static final int QUEUE_SLOTS_PER_PROCESSOR = 6; /** * This timeout should be longer than the longest expected worker calculation for a single-point request. @@ -111,8 +113,6 @@ public interface Config { /** Keeps some TransportNetworks around, lazy-loading or lazy-building them. */ public final NetworkPreloader networkPreloader; - private final Random random = new Random(); - /** The common root of all API URLs contacted by this worker, e.g. http://localhost:7070/api/ */ protected final String brokerBaseUrl; @@ -162,7 +162,7 @@ public static HttpClient makeHttpClient () { /** * A queue to hold a backlog of regional analysis tasks. - * This avoids "slow joiner" syndrome where we wait to poll for more work until all N fetched tasks have finished, + * This avoids "slow joiner" syndrome where we wait until all N fetched tasks have finished, * but one of the tasks takes much longer than all the rest. * This should be long enough to hold all that have come in - we don't need to block on polling the manager. * Can this be replaced with the general purpose TaskScheduler component? @@ -204,9 +204,9 @@ public void startPolling () { // The executor's queue is rather long because some tasks complete very fast and we poll max once per second. int availableProcessors = Runtime.getRuntime().availableProcessors(); LOG.debug("Java reports the number of available processors is: {}", availableProcessors); - int maxThreads = availableProcessors; - int taskQueueLength = availableProcessors * 6; - LOG.debug("Maximum number of regional processing threads is {}, length of task queue is {}.", maxThreads, taskQueueLength); + final int maxThreads = availableProcessors; + final int taskQueueLength = availableProcessors * QUEUE_SLOTS_PER_PROCESSOR; + LOG.debug("Maximum number of regional processing threads is {}, target length of task queue is {}.", maxThreads, taskQueueLength); BlockingQueue taskQueue = new LinkedBlockingQueue<>(taskQueueLength); regionalTaskExecutor = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, taskQueue); @@ -221,44 +221,79 @@ public void startPolling () { // see https://stackoverflow.com/a/15185004/778449 // A simpler approach might be to spin-wait checking whether the queue is low and sleeping briefly, // then fetch more work only when the queue is getting empty. + + // Allows slowing down polling when not actively receiving tasks. + boolean receivedWorkLastTime = false; + + // Before first polling the broker, randomly wait a few seconds to spread load when many workers start at once. + sleepSeconds((new Random()).nextInt(POLL_JITTER_SECONDS)); while (true) { - List tasks = getSomeWork(); - if (tasks == null || tasks.isEmpty()) { - // Either there was no work, or some kind of error occurred. - // Sleep for a while before polling again, adding a random component to spread out the polling load. - // TODO only randomize delay on the first round, after that it's excessive. - int randomWait = random.nextInt(POLL_MAX_RANDOM_WAIT); - LOG.debug("Polling the broker did not yield any regional tasks. Sleeping {} + {} sec.", POLL_WAIT_SECONDS, randomWait); - sleepSeconds(POLL_WAIT_SECONDS + randomWait); + // We establish a lower limit on the wait time between polling to avoid flooding the broker with requests. + // If worker handles all tasks in its internal queue in less than this time, this is a speed bottleneck. + // This can happen in areas unconnected to transit and when travel time cutoffs are very low. + sleepSeconds(POLL_INTERVAL_MIN_SECONDS); + + // Determine whether to poll this cycle - is the queue running empty, or has the maximum interval passed? + { + long currentTime = System.currentTimeMillis(); + boolean maxIntervalExceeded = (currentTime - lastPollingTime) > (POLL_INTERVAL_MAX_SECONDS * 1000); + int tasksInQueue = taskQueue.size(); + int minQueueLength = availableProcessors; // Poll whenever we have less tasks in the queue than processors. + boolean shouldPoll = maxIntervalExceeded || (receivedWorkLastTime && (tasksInQueue < minQueueLength)) ; + if (!shouldPoll) { + continue; + } + lastPollingTime = currentTime; + } + // This will request tasks even when queue is rather full. + // For now, assume more but smaller task and result chunks is better at leveling broker load. + int tasksToRequest = taskQueue.remainingCapacity(); + // Alternatively: Only request tasks when queue is short. Otherwise, only report results and status. + // int tasksToRequest = (tasksInQueue < minQueueLength) ? taskQueue.remainingCapacity() : 0; + + List tasks = getSomeWork(tasksToRequest); + boolean noWorkReceived = tasks == null || tasks.isEmpty(); + receivedWorkLastTime = !noWorkReceived; // Allows variable speed polling on the next iteration. + if (noWorkReceived) { + // Either the broker supplied no work or an error occurred. continue; } for (RegionalTask task : tasks) { - // Try to enqueue each task for execution, repeatedly failing until the queue is not full. - // The list of fetched tasks essentially serves as a secondary queue, which is awkward. This is using - // exceptions for normal flow control, which is nasty. We should do this differently (#596). - while (true) { - try { - // TODO define non-anonymous runnable class to instantiate here, specifically for async regional tasks. - regionalTaskExecutor.execute(() -> { - try { - this.handleOneRegionalTask(task); - } catch (Throwable t) { - LOG.error( - "An error occurred while handling a regional task, reporting to backend. {}", - ExceptionUtils.stackTraceString(t) - ); - synchronized (workResults) { - workResults.add(new RegionalWorkResult(t, task)); - } - } - }); - break; - } catch (RejectedExecutionException e) { - // Queue is full, wait a bit and try to feed it more tasks. If worker handles all tasks in its - // internal queue in less than 1 second, this is a speed bottleneck. This happens with regions - // unconnected to transit and with very small travel time cutoffs. - sleepSeconds(1); - } + // Executor services require blocking queues of fixed length. Tasks must be enqueued one by one, and + // may fail with a RejectedExecutionException if we exceed the queue length. We choose queue length + // and requested number of tasks carefully to avoid overfilling the queue, but should handle the + // exceptions just in case something is misconfigured. + try { + regionalTaskExecutor.execute(new RegionalTaskRunnable(task)); + } catch (RejectedExecutionException e) { + LOG.error("Regional task could not be enqueued for processing - queue lengths are misconfigured!"); + } + } + } + } + + /** + * Runnable inner class which can access the needed methods on AnalysisWorker (handleOneRegionalTask). + * However that method is only called from these runnables - it could potentially be inlined into run(). + */ + protected class RegionalTaskRunnable implements Runnable { + RegionalTask task; + + public RegionalTaskRunnable(RegionalTask task) { + this.task = task; + } + + @Override + public void run() { + try { + handleOneRegionalTask(task); + } catch (Throwable t) { + LOG.error( + "An error occurred while handling a regional task, reporting to backend. {}", + ExceptionUtils.stackTraceString(t) + ); + synchronized (workResults) { + workResults.add(new RegionalWorkResult(t, task)); } } } @@ -539,10 +574,12 @@ public static void addJsonToGrid ( * Also returns any accumulated work results to the backend. * @return a list of work tasks, or null if there was no work to do, or if no work could be fetched. */ - public List getSomeWork () { + public List getSomeWork (int tasksToRequest) { + LOG.debug("Polling backend to report status and request up to {} tasks.", tasksToRequest); String url = brokerBaseUrl + "/poll"; HttpPost httpPost = new HttpPost(url); WorkerStatus workerStatus = new WorkerStatus(this); + workerStatus.maxTasksRequested = tasksToRequest; // Include all completed work results when polling the backend. // Atomically copy and clear the accumulated work results, while blocking writes from other threads. synchronized (workResults) { @@ -552,7 +589,7 @@ public List getSomeWork () { // Compute throughput in tasks per minute and include it in the worker status report. // We poll too frequently to compute throughput just since the last poll operation. - // TODO reduce polling frequency (larger queue in worker), compute shorter-term throughput. + // We may want to reduce polling frequency (with larger queue in worker) and compute shorter-term throughput. workerStatus.tasksPerMinuteByJobId = throughputTracker.getTasksPerMinuteByJobId(); // Report how often we're polling for work, just for monitoring. @@ -573,8 +610,9 @@ public List getSomeWork () { // Broker returned some work. Use the lenient object mapper to decode it in case the broker is a // newer version so sending unrecognizable fields. // ReadValue closes the stream, releasing the HTTP connection. - return JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), + List tasks = JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), new TypeReference>() {}); + return tasks; } // Non-200 response code or a null entity. Something is weird. LOG.error("Unsuccessful polling. HTTP response code: " + response.getStatusLine().getStatusCode()); diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 7d002fa9a..2a17af7e2 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -50,6 +50,9 @@ public class WorkerStatus { public String ipAddress; public List results; + /** Then maximum number of tasks the broker should send to this worker. May be zero if its work queue is full. */ + public int maxTasksRequested = 0; + /** No-arg constructor used when deserializing. */ public WorkerStatus() { } From 20ecc467588cf31ffe08dc705ba2f9bbaf1c9716 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Mon, 20 Mar 2023 17:07:31 +0800 Subject: [PATCH 2/7] logging and comments --- .../com/conveyal/analysis/components/broker/Broker.java | 5 ++++- .../com/conveyal/r5/analyst/cluster/AnalysisWorker.java | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 06dc9cdfa..45183f745 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -100,7 +100,10 @@ public interface Config { private final ListMultimap jobs = MultimapBuilder.hashKeys().arrayListValues().build(); - /** The most tasks to deliver to a worker at a time. 50 tasks gives response bodies of about 65kB. */ + /** + * The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should + * never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. + */ public final int MAX_TASKS_PER_WORKER = 50; /** diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index 297c1bc1b..7013a8a5b 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -203,10 +203,10 @@ public void startPolling () { // The default task rejection policy is "Abort". // The executor's queue is rather long because some tasks complete very fast and we poll max once per second. int availableProcessors = Runtime.getRuntime().availableProcessors(); - LOG.debug("Java reports the number of available processors is: {}", availableProcessors); + LOG.info("Java reports the number of available processors is: {}", availableProcessors); final int maxThreads = availableProcessors; final int taskQueueLength = availableProcessors * QUEUE_SLOTS_PER_PROCESSOR; - LOG.debug("Maximum number of regional processing threads is {}, target length of task queue is {}.", maxThreads, taskQueueLength); + LOG.info("Maximum number of regional processing threads is {}, target length of task queue is {}.", maxThreads, taskQueueLength); BlockingQueue taskQueue = new LinkedBlockingQueue<>(taskQueueLength); regionalTaskExecutor = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, taskQueue); @@ -266,7 +266,7 @@ public void startPolling () { try { regionalTaskExecutor.execute(new RegionalTaskRunnable(task)); } catch (RejectedExecutionException e) { - LOG.error("Regional task could not be enqueued for processing - queue lengths are misconfigured!"); + LOG.error("Regional task could not be enqueued for processing (queue length exceeded). Task dropped."); } } } From b49248c7546bd8e162616100f8a877841b99792e Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Mon, 20 Mar 2023 17:08:33 +0800 Subject: [PATCH 3/7] default max number of tasks for legacy workers --- .../com/conveyal/r5/analyst/cluster/WorkerStatus.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 2a17af7e2..0eac522cd 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -24,7 +24,7 @@ public class WorkerStatus { private static final Logger LOG = LoggerFactory.getLogger(WorkerStatus.class); - + private static final int MAX_TASKS_LEGACY_WORKERS = 16; public String architecture; public int processors; public double loadAverage; @@ -50,8 +50,11 @@ public class WorkerStatus { public String ipAddress; public List results; - /** Then maximum number of tasks the broker should send to this worker. May be zero if its work queue is full. */ - public int maxTasksRequested = 0; + /** + * Then maximum number of tasks the broker should send to this worker. May be zero if its work queue is full. + * Default value determines the number of tasks to send to older workers that don't send this value when they poll. + */ + public int maxTasksRequested = MAX_TASKS_LEGACY_WORKERS; /** No-arg constructor used when deserializing. */ public WorkerStatus() { } From ebc91d3ccb4ab7bb9501486475b32814f61392e8 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Mon, 20 Mar 2023 18:52:34 +0800 Subject: [PATCH 4/7] increase core pool size and tweak other limits --- .../analysis/components/broker/Broker.java | 2 +- .../components/broker/WorkerCatalog.java | 12 ++---- .../r5/analyst/cluster/AnalysisWorker.java | 37 ++++++++++--------- .../r5/analyst/cluster/WorkerStatus.java | 14 ++++++- src/main/resources/logback.xml | 3 ++ 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 45183f745..b7edf8531 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -104,7 +104,7 @@ public interface Config { * The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should * never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. */ - public final int MAX_TASKS_PER_WORKER = 50; + public final int MAX_TASKS_PER_WORKER = 100; /** * Used when auto-starting spot instances. Set to a smaller value to increase the number of diff --git a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java index a3410e758..18ed6534f 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java +++ b/src/main/java/com/conveyal/analysis/components/broker/WorkerCatalog.java @@ -22,7 +22,8 @@ */ public class WorkerCatalog { - public static final int WORKER_RECORD_DURATION_MSEC = 2 * 60 * 1000; + /** If a worker says it will poll every s seconds or less, wait s plus this number before considering it gone. */ + private static final int POLLING_TOLERANCE_SECONDS = 5; /** * The information supplied by workers the last time they polled for more tasks. @@ -75,16 +76,9 @@ public synchronized void catalog (WorkerStatus workerStatus) { */ private synchronized void purgeDeadWorkers () { long now = System.currentTimeMillis(); - // TODO purge newer workers much sooner since they poll regularly every 15 seconds. - // Unfortunately we need to keep the higher threshold around for older worker versions. - // Rather than decoding the worker version string, we can also just look at whether - // observation.status.maxTasksRequested > 0 which is only true on regularly-polling workers. - // Maybe the workers should even send their own poll interval in their status for this reason - // (to handle future tweaks across versions). This field could default to WORKER_RECORD_DURATION_MSEC. - - long oldestAcceptable = now - WORKER_RECORD_DURATION_MSEC; List ancientObservations = new ArrayList<>(); for (WorkerObservation observation : observationsByWorkerId.values()) { + long oldestAcceptable = now - ((observation.status.pollIntervalSeconds + POLLING_TOLERANCE_SECONDS) * 1000); if (observation.lastSeen < oldestAcceptable) { ancientObservations.add(observation); } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index 7013a8a5b..45ad5148c 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -78,7 +78,7 @@ public interface Config { private static final int POLL_INTERVAL_MIN_SECONDS = 1; private static final int POLL_INTERVAL_MAX_SECONDS = 15; private static final int POLL_JITTER_SECONDS = 5; - private static final int QUEUE_SLOTS_PER_PROCESSOR = 6; + private static final int QUEUE_SLOTS_PER_PROCESSOR = 10; /** * This timeout should be longer than the longest expected worker calculation for a single-point request. @@ -126,8 +126,11 @@ public interface Config { */ private final List workResults = new ArrayList<>(); - /** The last time (in milliseconds since the epoch) that we polled for work. */ - private long lastPollingTime; + /** + * The last time (in milliseconds since the epoch) that we polled for work. + * The initial value of zero causes the worker to poll the backend immediately on startup avoiding a delay. + */ + private long lastPollingTime = 0; /** Keep track of how many tasks per minute this worker is processing, broken down by scenario ID. */ private final ThroughputTracker throughputTracker = new ThroughputTracker(); @@ -199,28 +202,27 @@ public AnalysisWorker ( /** The main worker event loop which fetches tasks from a broker and schedules them for execution. */ public void startPolling () { - // Create executors with up to one thread per processor. - // The default task rejection policy is "Abort". - // The executor's queue is rather long because some tasks complete very fast and we poll max once per second. + // Create an executor with one thread per processor. The default task rejection policy is "Abort". + // The number of threads will only increase from the core pool size toward the max pool size when the queue is + // full. We no longer exceed the queue length in normal operation, so the thread pool will remain at core size. + // "[The] core pool size is the threshold beyond which [an] executor service prefers to queue up the task than + // spawn a new thread." + // The executor's queue is rather long because some tasks complete very fast and we poll at most once per second. int availableProcessors = Runtime.getRuntime().availableProcessors(); LOG.info("Java reports the number of available processors is: {}", availableProcessors); final int maxThreads = availableProcessors; final int taskQueueLength = availableProcessors * QUEUE_SLOTS_PER_PROCESSOR; LOG.info("Maximum number of regional processing threads is {}, target length of task queue is {}.", maxThreads, taskQueueLength); BlockingQueue taskQueue = new LinkedBlockingQueue<>(taskQueueLength); - regionalTaskExecutor = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, taskQueue); + regionalTaskExecutor = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, taskQueue); - // Main polling loop to fill the regional work queue. + // This is the main polling loop that fills the regional work queue. // Go into an endless loop polling for regional tasks that can be computed asynchronously. // You'd think the ThreadPoolExecutor could just block when the blocking queue is full, but apparently // people all over the world have been jumping through hoops to try to achieve this simple behavior // with no real success, at least without writing bug and deadlock-prone custom executor services. - // Two alternative approaches are trying to keep the queue full and waiting for the queue to be almost empty. - // To keep the queue full, we repeatedly try to add each task to the queue, pausing and retrying when - // it's full. To wait until it's almost empty, we could use wait() in a loop and notify() as tasks are handled. - // see https://stackoverflow.com/a/15185004/778449 - // A simpler approach might be to spin-wait checking whether the queue is low and sleeping briefly, - // then fetch more work only when the queue is getting empty. + // Our current (revised) approach is to slowly spin-wait, checking whether the queue is low and sleeping briefly, + // then fetching more work only when the queue is getting empty. // Allows slowing down polling when not actively receiving tasks. boolean receivedWorkLastTime = false; @@ -232,14 +234,14 @@ public void startPolling () { // If worker handles all tasks in its internal queue in less than this time, this is a speed bottleneck. // This can happen in areas unconnected to transit and when travel time cutoffs are very low. sleepSeconds(POLL_INTERVAL_MIN_SECONDS); - // Determine whether to poll this cycle - is the queue running empty, or has the maximum interval passed? { long currentTime = System.currentTimeMillis(); boolean maxIntervalExceeded = (currentTime - lastPollingTime) > (POLL_INTERVAL_MAX_SECONDS * 1000); int tasksInQueue = taskQueue.size(); - int minQueueLength = availableProcessors; // Poll whenever we have less tasks in the queue than processors. - boolean shouldPoll = maxIntervalExceeded || (receivedWorkLastTime && (tasksInQueue < minQueueLength)) ; + // Poll any time we have less tasks in the queue than processors. + boolean shouldPoll = maxIntervalExceeded || (receivedWorkLastTime && (tasksInQueue < availableProcessors)); + LOG.debug("Last polled {} sec ago. Task queue length is {}.", (currentTime - lastPollingTime)/1000, taskQueue.size()); if (!shouldPoll) { continue; } @@ -580,6 +582,7 @@ public List getSomeWork (int tasksToRequest) { HttpPost httpPost = new HttpPost(url); WorkerStatus workerStatus = new WorkerStatus(this); workerStatus.maxTasksRequested = tasksToRequest; + workerStatus.pollIntervalSeconds = POLL_INTERVAL_MAX_SECONDS; // Include all completed work results when polling the backend. // Atomically copy and clear the accumulated work results, while blocking writes from other threads. synchronized (workResults) { diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 0eac522cd..1fcd17a6e 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -24,7 +24,9 @@ public class WorkerStatus { private static final Logger LOG = LoggerFactory.getLogger(WorkerStatus.class); - private static final int MAX_TASKS_LEGACY_WORKERS = 16; + private static final int LEGACY_WORKER_MAX_TASKS = 16; + public static final int LEGACY_WORKER_MAX_POLL_INTERVAL_SECONDS = 2 * 60; + public String architecture; public int processors; public double loadAverage; @@ -54,7 +56,15 @@ public class WorkerStatus { * Then maximum number of tasks the broker should send to this worker. May be zero if its work queue is full. * Default value determines the number of tasks to send to older workers that don't send this value when they poll. */ - public int maxTasksRequested = MAX_TASKS_LEGACY_WORKERS; + public int maxTasksRequested = LEGACY_WORKER_MAX_TASKS; + + /** + * The maximum amount of time the worker will wait before polling again. After this much time passes the backend + * may consider the worker lost or shut down. The backend should be somewhat lenient here as there could be delays + * due to connection setup, API contention etc. The default value reflects how how long the backend should wait to + * hear from older workers that don't send this value. + */ + public int pollIntervalSeconds = LEGACY_WORKER_MAX_POLL_INTERVAL_SECONDS; /** No-arg constructor used when deserializing. */ public WorkerStatus() { } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 00e0cdabf..34bfac76c 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -22,4 +22,7 @@ + + + \ No newline at end of file From 5ae7e8b71f5cf6c6f9de6793fc8caab910dbee42 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Tue, 19 Sep 2023 10:35:57 +0200 Subject: [PATCH 5/7] updates in response to PR review --- .../conveyal/r5/analyst/cluster/AnalysisWorker.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index 45ad5148c..ce20d3548 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -205,8 +205,8 @@ public void startPolling () { // Create an executor with one thread per processor. The default task rejection policy is "Abort". // The number of threads will only increase from the core pool size toward the max pool size when the queue is // full. We no longer exceed the queue length in normal operation, so the thread pool will remain at core size. - // "[The] core pool size is the threshold beyond which [an] executor service prefers to queue up the task than - // spawn a new thread." + // "[The] core pool size is the threshold beyond which [an] executor service prefers to queue up the task + // [rather] than spawn a new thread." https://stackoverflow.com/a/72684387/778449 // The executor's queue is rather long because some tasks complete very fast and we poll at most once per second. int availableProcessors = Runtime.getRuntime().availableProcessors(); LOG.info("Java reports the number of available processors is: {}", availableProcessors); @@ -613,9 +613,10 @@ public List getSomeWork (int tasksToRequest) { // Broker returned some work. Use the lenient object mapper to decode it in case the broker is a // newer version so sending unrecognizable fields. // ReadValue closes the stream, releasing the HTTP connection. - List tasks = JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), - new TypeReference>() {}); - return tasks; + return JsonUtilities.lenientObjectMapper.readValue( + responseEntity.getContent(), + new TypeReference>() {} + ); } // Non-200 response code or a null entity. Something is weird. LOG.error("Unsuccessful polling. HTTP response code: " + response.getStatusLine().getStatusCode()); From 9195be0d17d56204fb2fbde17b90195bd910969b Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Wed, 20 Sep 2023 12:36:56 +0200 Subject: [PATCH 6/7] update Broker javadoc --- .../analysis/components/broker/Broker.java | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index b7edf8531..fb0e5c9bd 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -48,36 +48,41 @@ /** * This class distributes the tasks making up regional jobs to workers. *

- * It should aim to draw tasks fairly from all organizations, and fairly from all jobs within each - * organization, while attempting to respect the transport network affinity of each worker, giving - * the worker tasks that require the same network it has been using recently. + * It respects the declared transport network affinity of each worker, giving the worker tasks that + * relate to the same network it has been using recently, and is therefore already loaded in memory. *

- * Previously workers long-polled for work, holding lots of connections open. Now they short-poll - * and sleep for a while if there's no work. This is simpler and allows us to work withing much more - * standard HTTP frameworks. + * In our initial design, workers long-polled for work, holding lots of connections open. This was + * soon revised to short-poll and sleep for a while when there's no work. This was found to be + * simpler, allowing use of standard HTTP frameworks instead of custom networking code. *

- * The fact that workers continuously re-poll for work every 10-30 seconds serves as a signal to the - * broker that they are still alive and waiting. This also allows the broker to maintain a catalog - * of active workers. + * Issue conveyal/r5#596 arose because we'd only poll when a worker was running low on tasks or + * idling. A worker could disappear for a long time, leaving the backend to assume it had shut down + * or crashed. Workers were revised to poll more frequently even when they were busy and didn't + * need any new tasks to work on, providing a signal to the broker that they are still alive and + * functioning. This allows the broker to maintain a more accurate catalog of active workers. *

- * Because (at least currently) two organizations never share the same graph, we can get by with - * pulling tasks cyclically or randomly from all the jobs, and actively shape the number of workers - * with affinity for each graph by forcing some of them to accept tasks on graphs other than the one - * they have declared affinity for. + * Most methods on this class are synchronized because they can be called from many HTTP handler + * threads at once (when many workers are polling at once). We should occasionally evaluate whether + * synchronizing all methods to make this threadsafe is a performance issue. If so, fine-grained + * locking may be advantageous, but as a rule it is much harder to design, test, and maintain. *

- * This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal + * Workers were originally intended to migrate from one network to another to handle subsequent jobs + * without waiting for more cloud compute instances to start up. In practice we currently assign + * each worker a single network, but the balance of workers assigned to each network and the reuse + * of workers could in principle be made more sophisticated. The remainder of the comments below + * provide context for how this could be refined or improved. + * + * Because (at least currently) two organizations never share the same graph, we could get by with + * pulling tasks cyclically or randomly from all the jobs, and could actively shape the number of + * workers with affinity for each graph by forcing some of them to accept tasks on graphs other than + * the one they have declared affinity for. If the pool of workers was allowed to grow very large, + * we could aim to draw tasks fairly from all organizations, and fairly from all jobs within each + * organization. + *

+ * We have described this approach as "affinity homeostasis": constantly keep track of the ideal * proportion of workers by graph (based on active jobs), and the true proportion of consumers by * graph (based on incoming polling), then we can decide when a worker's graph affinity should be - * ignored and what it should be forced to. - *

- * It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job - * is serviced, it is taken out of the queue and put at its end. Jobs that have not been serviced - * float to the top. - *

- * Most methods on this class are synchronized, because they can be called from many HTTP handler - * threads at once. - * - * TODO evaluate whether synchronizing all methods to make this threadsafe is a performance issue. + * ignored and what graph it should be forced to. */ public class Broker implements Component { From 0a071d476ea27f76b05ca064764c9364c53ff404 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Wed, 20 Sep 2023 13:29:27 +0200 Subject: [PATCH 7/7] reduce queue length and size of task blocks --- .../conveyal/analysis/components/broker/Broker.java | 10 +++++++--- .../conveyal/r5/analyst/cluster/AnalysisWorker.java | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index fb0e5c9bd..2aea67dc2 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -107,9 +107,14 @@ public interface Config { /** * The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should - * never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. + * never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value + * is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other + * workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete. + * + * The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic + * polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work. */ - public final int MAX_TASKS_PER_WORKER = 100; + public final int MAX_TASKS_PER_WORKER = 16; /** * Used when auto-starting spot instances. Set to a smaller value to increase the number of @@ -348,7 +353,6 @@ public synchronized List getSomeWork (WorkerCategory workerCategor } // Return up to N tasks that are waiting to be processed. if (maxTasksRequested > MAX_TASKS_PER_WORKER) { - LOG.warn("Worker requested {} tasks, reducing to {}.", maxTasksRequested, MAX_TASKS_PER_WORKER); maxTasksRequested = MAX_TASKS_PER_WORKER; } return job.generateSomeTasksToDeliver(maxTasksRequested); diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index ce20d3548..74132758f 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -78,7 +78,7 @@ public interface Config { private static final int POLL_INTERVAL_MIN_SECONDS = 1; private static final int POLL_INTERVAL_MAX_SECONDS = 15; private static final int POLL_JITTER_SECONDS = 5; - private static final int QUEUE_SLOTS_PER_PROCESSOR = 10; + private static final int QUEUE_SLOTS_PER_PROCESSOR = 8; /** * This timeout should be longer than the longest expected worker calculation for a single-point request.