From c937068625e41d49b91316102fe5d41d5f38a119 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 4 Jan 2024 17:42:02 +0530 Subject: [PATCH] Improve polling in segment allocation queue (#15590) Description When batchAllocationWaitTime is set to 0, the segment allocation queue is polled continuously even when it is empty. This would take up cpu cycles unnecessarily. Some existing race conditions would also become more frequent when the batchAllocationWaitTime is 0. This PR tries to better address those race conditions as well. Changes Do not reschedule a poll if queue is empty When a new batch is added to queue, schedule a poll Simplify keyToBatch map Handle race conditions better As soon as a batch starts getting processed, do not add any more requests to it --- .../actions/SegmentAllocationQueue.java | 199 +++++++++--------- 1 file changed, 100 insertions(+), 99 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 6638c2f2578e..6986ec683a56 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -75,13 +75,17 @@ public class SegmentAllocationQueue private final long maxWaitTimeMillis; private final TaskLockbox taskLockbox; - private final ScheduledExecutorService executor; private final IndexerMetadataStorageCoordinator metadataStorage; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final ServiceEmitter emitter; + /** + * Single-threaded executor to process allocation queue. + */ + private final ScheduledExecutorService executor; + private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); - private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); @Inject public SegmentAllocationQueue( @@ -149,6 +153,10 @@ public boolean isEnabled() return executor != null && !executor.isShutdown(); } + /** + * Schedules a poll of the allocation queue that runs on the {@link #executor}. + * It is okay to schedule multiple polls since the executor is single threaded. + */ private void scheduleQueuePoll(long delay) { executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS); @@ -174,19 +182,20 @@ public Future add(SegmentAllocateRequest request) throw new ISE("Batched segment allocation is disabled."); } - final AllocateRequestKey requestKey = getKeyForAvailableBatch(request); + final AllocateRequestKey requestKey = new AllocateRequestKey(request); final AtomicReference> futureReference = new AtomicReference<>(); // Possible race condition: // t1 -> new batch is added to queue or batch already exists in queue // t2 -> executor pops batch, processes all requests in it // t1 -> new request is added to dangling batch and is never picked up - // Solution: For existing batch, call keyToBatch.remove() on the key to - // wait on keyToBatch.compute() to finish before proceeding with processBatch(). - // For new batch, keyToBatch.remove() would not wait as key is not in map yet - // but a new batch is unlikely to be due immediately, so it won't get popped right away. + // Solution: Perform the following operations only inside keyToBatch.compute(): + // 1. Add or remove from map + // 2. Add batch to queue + // 3. Mark batch as started + // 4. Update requests in batch keyToBatch.compute(requestKey, (key, existingBatch) -> { - if (existingBatch == null) { + if (existingBatch == null || existingBatch.isStarted() || existingBatch.isFull()) { AllocateRequestBatch newBatch = new AllocateRequestBatch(key); futureReference.set(newBatch.add(request)); return addBatchToQueue(newBatch) ? newBatch : null; @@ -199,36 +208,19 @@ public Future add(SegmentAllocateRequest request) return futureReference.get(); } - /** - * Returns the key for a batch that is not added to the queue yet and/or has - * available space. Throws an exception if the queue is already full and no - * batch has available capacity. - */ - private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request) - { - for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; ++batchIncrementalId) { - AllocateRequestKey nextKey = new AllocateRequestKey(request, maxWaitTimeMillis, batchIncrementalId); - AllocateRequestBatch nextBatch = keyToBatch.get(nextKey); - if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) { - return nextKey; - } - } - - throw new ISE("Allocation queue is at capacity, all batches are full."); - } - /** * Tries to add the given batch to the processing queue. Fails all the pending * requests in the batch if we are not leader or if the queue is full. */ private boolean addBatchToQueue(AllocateRequestBatch batch) { - batch.key.resetQueueTime(); + batch.resetQueueTime(); if (!isLeader.get()) { batch.failPendingRequests("Not leader anymore"); return false; - } else if (processingQueue.offer(batch.key)) { - log.debug("Added a new batch [%s] to queue.", batch.key); + } else if (processingQueue.offer(batch)) { + log.debug("Added a new batch for key[%s] to queue.", batch.key); + scheduleQueuePoll(maxWaitTimeMillis); return true; } else { batch.failPendingRequests( @@ -248,7 +240,7 @@ private void requeueBatch(AllocateRequestBatch batch) { log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key); keyToBatch.compute(batch.key, (key, existingBatch) -> { - if (existingBatch == null) { + if (existingBatch == null || existingBatch.isFull() || existingBatch.isStarted()) { return addBatchToQueue(batch) ? batch : null; } else { // Merge requests from this batch to existing one @@ -262,44 +254,43 @@ private void processBatchesDue() { clearQueueIfNotLeader(); + // Process all the batches that are already due int numProcessedBatches = 0; - AllocateRequestKey nextKey = processingQueue.peekFirst(); - while (nextKey != null && nextKey.isDue()) { - processingQueue.pollFirst(); - + AllocateRequestBatch nextBatch = processingQueue.peekFirst(); + while (nextBatch != null && nextBatch.isDue()) { // Process the next batch in the queue + processingQueue.pollFirst(); + final AllocateRequestBatch currentBatch = nextBatch; boolean processed; - AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); try { - processed = processBatch(nextBatch); + processed = processBatch(currentBatch); } catch (Throwable t) { - nextBatch.failPendingRequests(t); + currentBatch.failPendingRequests(t); processed = true; - log.error(t, "Error while processing batch [%s]", nextKey); + log.error(t, "Error while processing batch [%s]", currentBatch.key); } // Requeue if not fully processed yet if (processed) { ++numProcessedBatches; } else { - requeueBatch(nextBatch); + requeueBatch(currentBatch); } - nextKey = processingQueue.peek(); + nextBatch = processingQueue.peek(); } - // Schedule the next round of processing - final long nextScheduleDelay; + // Schedule the next round of processing if the queue is not empty if (processingQueue.isEmpty()) { - nextScheduleDelay = maxWaitTimeMillis; + log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches); } else { - nextKey = processingQueue.peek(); - long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime(); - nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); + nextBatch = processingQueue.peek(); + long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime(); + long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); + scheduleQueuePoll(nextScheduleDelay); + log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); } - scheduleQueuePoll(nextScheduleDelay); - log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); } /** @@ -308,14 +299,14 @@ private void processBatchesDue() private void clearQueueIfNotLeader() { int failedBatches = 0; - AllocateRequestKey nextKey = processingQueue.peekFirst(); - while (nextKey != null && !isLeader.get()) { + AllocateRequestBatch nextBatch = processingQueue.peekFirst(); + while (nextBatch != null && !isLeader.get()) { processingQueue.pollFirst(); - AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); + keyToBatch.remove(nextBatch.key); nextBatch.failPendingRequests("Not leader anymore"); ++failedBatches; - nextKey = processingQueue.peekFirst(); + nextBatch = processingQueue.peekFirst(); } if (failedBatches > 0) { log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", failedBatches, processingQueue.size()); @@ -323,11 +314,21 @@ private void clearQueueIfNotLeader() } /** - * Processes the given batch. Returns true if the batch was completely processed - * and should not be requeued. + * Processes the given batch. This method marks the batch as started and + * removes it from the map {@link #keyToBatch} so that no more requests can be + * added to it. + * + * @return true if the batch was completely processed and should not be requeued. */ private boolean processBatch(AllocateRequestBatch requestBatch) { + keyToBatch.compute(requestBatch.key, (batchKey, latestBatchForKey) -> { + // Mark the batch as started so that no more requests are added to it + requestBatch.markStarted(); + // Remove the corresponding key from the map if this is the latest batch for the key + return requestBatch.equals(latestBatchForKey) ? null : latestBatchForKey; + }); + final AllocateRequestKey requestKey = requestBatch.key; if (requestBatch.isEmpty()) { return true; @@ -338,13 +339,13 @@ private boolean processBatch(AllocateRequestBatch requestBatch) log.debug( "Processing [%d] requests for batch [%s], queue time [%s].", - requestBatch.size(), requestKey, requestKey.getQueueTime() + requestBatch.size(), requestKey, requestBatch.getQueueTime() ); final long startTimeMillis = System.currentTimeMillis(); final int batchSize = requestBatch.size(); emitBatchMetric("task/action/batch/size", batchSize, requestKey); - emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestKey.getQueueTime()), requestKey); + emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestBatch.getQueueTime()), requestKey); final Set usedSegments = retrieveUsedSegments(requestKey); final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments); @@ -546,15 +547,13 @@ private void emitBatchMetric(String metric, long value, AllocateRequestKey key) */ private class AllocateRequestBatch { + private long queueTimeMillis; private final AllocateRequestKey key; + private boolean started = false; /** * Map from allocate requests (represents a single SegmentAllocateAction) * to the future of allocated segment id. - *

- * This must be accessed through methods synchronized on this batch. - * It is to avoid races between a new request being added just when the batch - * is being processed. */ private final Map> requestToFuture = new HashMap<>(); @@ -564,29 +563,60 @@ private class AllocateRequestBatch this.key = key; } - synchronized Future add(SegmentAllocateRequest request) + long getQueueTime() + { + return queueTimeMillis; + } + + boolean isDue() + { + return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; + } + + void resetQueueTime() + { + queueTimeMillis = System.currentTimeMillis(); + started = false; + } + + void markStarted() + { + started = true; + } + + boolean isStarted() + { + return started; + } + + boolean isFull() + { + return size() >= MAX_BATCH_SIZE; + } + + Future add(SegmentAllocateRequest request) { log.debug("Adding request to batch [%s]: %s", key, request.getAction()); return requestToFuture.computeIfAbsent(request, req -> new CompletableFuture<>()); } - synchronized void transferRequestsFrom(AllocateRequestBatch batch) + void transferRequestsFrom(AllocateRequestBatch batch) { requestToFuture.putAll(batch.requestToFuture); batch.requestToFuture.clear(); } - synchronized Set getRequests() + Set getRequests() { return new HashSet<>(requestToFuture.keySet()); } - synchronized void failPendingRequests(String reason) + void failPendingRequests(String reason) { failPendingRequests(new ISE(reason)); } - synchronized void failPendingRequests(Throwable cause) + void failPendingRequests(Throwable cause) { if (!requestToFuture.isEmpty()) { log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key); @@ -598,7 +628,7 @@ synchronized void failPendingRequests(Throwable cause) } } - synchronized void completePendingRequestsWithNull() + void completePendingRequestsWithNull() { if (requestToFuture.isEmpty()) { return; @@ -611,7 +641,7 @@ synchronized void completePendingRequestsWithNull() requestToFuture.clear(); } - synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) + void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) { request.incrementAttempts(); @@ -634,12 +664,12 @@ synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequ } } - synchronized boolean isEmpty() + boolean isEmpty() { return requestToFuture.isEmpty(); } - synchronized int size() + int size() { return requestToFuture.size(); } @@ -650,14 +680,6 @@ synchronized int size() */ private static class AllocateRequestKey { - /** - * ID to distinguish between two batches for the same datasource, groupId, etc. - */ - private final int batchIncrementalId; - - private long queueTimeMillis; - private final long maxWaitTimeMillis; - private final String dataSource; private final String groupId; private final Interval preferredAllocationInterval; @@ -675,12 +697,11 @@ private static class AllocateRequestKey * Creates a new key for the given request. The batch for a unique key will * always contain a single request. */ - AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId) + AllocateRequestKey(SegmentAllocateRequest request) { final SegmentAllocateAction action = request.getAction(); final Task task = request.getTask(); - this.batchIncrementalId = batchIncrementalId; this.dataSource = action.getDataSource(); this.groupId = task.getGroupId(); this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck(); @@ -694,30 +715,12 @@ private static class AllocateRequestKey this.hash = Objects.hash( dataSource, groupId, - batchIncrementalId, skipSegmentLineageCheck, useNonRootGenPartitionSpace, preferredAllocationInterval, lockGranularity ); this.serialized = serialize(); - - this.maxWaitTimeMillis = maxWaitTimeMillis; - } - - void resetQueueTime() - { - queueTimeMillis = System.currentTimeMillis(); - } - - long getQueueTime() - { - return queueTimeMillis; - } - - boolean isDue() - { - return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; } @Override @@ -732,7 +735,6 @@ public boolean equals(Object o) AllocateRequestKey that = (AllocateRequestKey) o; return dataSource.equals(that.dataSource) && groupId.equals(that.groupId) - && batchIncrementalId == that.batchIncrementalId && skipSegmentLineageCheck == that.skipSegmentLineageCheck && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace && preferredAllocationInterval.equals(that.preferredAllocationInterval) @@ -756,7 +758,6 @@ private String serialize() return "{" + "datasource='" + dataSource + '\'' + ", groupId='" + groupId + '\'' + - ", batchId=" + batchIncrementalId + ", lock=" + lockGranularity + ", allocInterval=" + preferredAllocationInterval + ", skipLineageCheck=" + skipSegmentLineageCheck +