diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index b5be94ba4ba7..aa35b0681ec5 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -311,7 +311,7 @@ See [Enabling metrics](../configuration/index.md#enabling-metrics) for more deta ## Coordination -These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic. +These metrics are emitted by the Druid Coordinator in every run of the corresponding coordinator duty. |Metric|Description|Dimensions|Normal value| |------|-----------|----------|------------| @@ -325,6 +325,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`segment/dropSkipped/count`|Number of segments that could not be dropped from any server.|`dataSource`, `tier`, `description`|Varies| |`segment/loadQueue/size`|Size in bytes of segments to load.|`server`|Varies| |`segment/loadQueue/count`|Number of segments to load.|`server`|Varies| +|`segment/loading/rateKbps`|Current rate of segment loading on a server in kbps (1000 bits per second). The rate is calculated as a moving average over the last 10 GiB or more of successful segment loads on that server.|`server`|Varies| |`segment/dropQueue/count`|Number of segments to drop.|`server`|Varies| |`segment/loadQueue/assigned`|Number of segments assigned for load or drop to the load queue of a server.|`dataSource`, `server`|Varies| |`segment/loadQueue/success`|Number of segment assignments that completed successfully.|`dataSource`, `server`|Varies| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java index da2f1e1a04a3..3056d1c2bd0e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java @@ -70,6 +70,7 @@ private void collectLoadQueueStats(CoordinatorRunStats stats) stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad()); stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size()); stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size()); + stats.updateMax(Stats.SegmentQueue.LOAD_RATE_KBPS, rowKey, queuePeon.getLoadRateKbps()); queuePeon.getAndResetStats().forEachStat( (stat, key, statValue) -> diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java index 52b012a81d46..333d3b0e3056 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java @@ -170,6 +170,12 @@ public long getSizeOfSegmentsToLoad() return queuedSize.get(); } + @Override + public long getLoadRateKbps() + { + return 0; + } + @Override public CoordinatorRunStats getAndResetStats() { @@ -179,7 +185,7 @@ public CoordinatorRunStats getAndResetStats() @Override public void loadSegment(final DataSegment segment, SegmentAction action, @Nullable final LoadPeonCallback callback) { - SegmentHolder segmentHolder = new SegmentHolder(segment, action, callback); + SegmentHolder segmentHolder = new SegmentHolder(segment, action, Duration.ZERO, callback); final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder); if (existingHolder != null) { existingHolder.addCallback(callback); @@ -193,7 +199,7 @@ public void loadSegment(final DataSegment segment, SegmentAction action, @Nullab @Override public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback) { - SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, callback); + SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, Duration.ZERO, callback); final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder); if (existingHolder != null) { existingHolder.addCallback(callback); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index cb32f95516b4..a72cb22e1519 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -36,6 +36,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; import org.apache.druid.server.coordination.DataSegmentChangeResponse; +import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordination.SegmentChangeStatus; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; @@ -92,6 +93,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon private final ConcurrentMap segmentsToLoad = new ConcurrentHashMap<>(); private final ConcurrentMap segmentsToDrop = new ConcurrentHashMap<>(); private final Set segmentsMarkedToDrop = ConcurrentHashMap.newKeySet(); + private final LoadingRateTracker loadingRateTracker = new LoadingRateTracker(); /** * Segments currently in queue ordered by priority and interval. This includes @@ -169,11 +171,10 @@ private void doSegmentManagement() synchronized (lock) { final Iterator queuedSegmentIterator = queuedSegments.iterator(); - final long currentTimeMillis = System.currentTimeMillis(); while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) { final SegmentHolder holder = queuedSegmentIterator.next(); final DataSegment segment = holder.getSegment(); - if (hasRequestTimedOut(holder, currentTimeMillis)) { + if (holder.hasRequestTimedOut()) { onRequestFailed(holder, "timed out"); queuedSegmentIterator.remove(); if (holder.isLoad()) { @@ -188,9 +189,13 @@ private void doSegmentManagement() activeRequestSegments.add(segment); } } + + if (segmentsToLoad.isEmpty()) { + loadingRateTracker.markBatchLoadingFinished(); + } } - if (newRequests.size() == 0) { + if (newRequests.isEmpty()) { log.trace( "[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", serverId, segmentsToLoad.size(), segmentsToDrop.size(), config.getBatchSize() @@ -201,6 +206,11 @@ private void doSegmentManagement() try { log.trace("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId); + final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r instanceof SegmentChangeRequestLoad); + if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) { + loadingRateTracker.markBatchLoadingStarted(); + } + BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); ListenableFuture future = httpClient.go( new Request(HttpMethod.POST, changeRequestURL) @@ -234,9 +244,16 @@ public void onSuccess(InputStream result) return; } + int numSuccessfulLoads = 0; + long successfulLoadSize = 0; for (DataSegmentChangeResponse e : statuses) { switch (e.getStatus().getState()) { case SUCCESS: + if (e.getRequest() instanceof SegmentChangeRequestLoad) { + ++numSuccessfulLoads; + successfulLoadSize += + ((SegmentChangeRequestLoad) e.getRequest()).getSegment().getSize(); + } case FAILED: handleResponseStatus(e.getRequest(), e.getStatus()); break; @@ -248,6 +265,10 @@ public void onSuccess(InputStream result) log.error("Server[%s] returned unknown state in status[%s].", serverId, e.getStatus()); } } + + if (numSuccessfulLoads > 0) { + loadingRateTracker.incrementBytesLoadedInBatch(successfulLoadSize); + } } } catch (Exception ex) { @@ -284,9 +305,7 @@ private void logRequestFailure(Throwable t) log.error( t, "Request[%s] Failed with status[%s]. Reason[%s].", - changeRequestURL, - responseHandler.getStatus(), - responseHandler.getDescription() + changeRequestURL, responseHandler.getStatus(), responseHandler.getDescription() ); } }, @@ -367,7 +386,7 @@ public void stop() if (stopped) { return; } - log.info("Stopping load queue peon for server [%s].", serverId); + log.info("Stopping load queue peon for server[%s].", serverId); stopped = true; // Cancel all queued requests @@ -379,6 +398,7 @@ public void stop() queuedSegments.clear(); activeRequestSegments.clear(); queuedSize.set(0L); + loadingRateTracker.stop(); stats.get().clear(); } } @@ -387,7 +407,7 @@ public void stop() public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallback callback) { if (!action.isLoad()) { - log.warn("Invalid load action [%s] for segment [%s] on server [%s].", action, segment.getId(), serverId); + log.warn("Invalid load action[%s] for segment[%s] on server[%s].", action, segment.getId(), serverId); return; } @@ -407,7 +427,7 @@ public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallb if (holder == null) { log.trace("Server[%s] to load segment[%s] queued.", serverId, segment.getId()); queuedSize.addAndGet(segment.getSize()); - holder = new SegmentHolder(segment, action, callback); + holder = new SegmentHolder(segment, action, config.getLoadTimeout(), callback); segmentsToLoad.put(segment, holder); queuedSegments.add(holder); processingExecutor.execute(this::doSegmentManagement); @@ -436,7 +456,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback) if (holder == null) { log.trace("Server[%s] to drop segment[%s] queued.", serverId, segment.getId()); - holder = new SegmentHolder(segment, SegmentAction.DROP, callback); + holder = new SegmentHolder(segment, SegmentAction.DROP, config.getLoadTimeout(), callback); segmentsToDrop.put(segment, holder); queuedSegments.add(holder); processingExecutor.execute(this::doSegmentManagement); @@ -481,6 +501,12 @@ public long getSizeOfSegmentsToLoad() return queuedSize.get(); } + @Override + public long getLoadRateKbps() + { + return loadingRateTracker.getMovingAverageLoadRateKbps(); + } + @Override public CoordinatorRunStats getAndResetStats() { @@ -505,19 +531,6 @@ public Set getSegmentsMarkedToDrop() return Collections.unmodifiableSet(segmentsMarkedToDrop); } - /** - * A request is considered to have timed out if the time elapsed since it was - * first sent to the server is greater than the configured load timeout. - * - * @see HttpLoadQueuePeonConfig#getLoadTimeout() - */ - private boolean hasRequestTimedOut(SegmentHolder holder, long currentTimeMillis) - { - return holder.isRequestSentToServer() - && currentTimeMillis - holder.getFirstRequestMillis() - > config.getLoadTimeout().getMillis(); - } - private void onRequestFailed(SegmentHolder holder, String failureCause) { log.error( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadPeonCallback.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadPeonCallback.java index 2a2163563db7..320062514b04 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadPeonCallback.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadPeonCallback.java @@ -20,15 +20,11 @@ package org.apache.druid.server.coordinator.loading; /** + * Callback executed when the load or drop of a segment completes on a server + * either with success or failure. */ +@FunctionalInterface public interface LoadPeonCallback { - /** - * Ideally, this method is called after the load/drop opertion is successfully done, i.e., the historical node - * removes the zookeeper node from loadQueue and announces/unannouces the segment. However, this method will - * also be called in failure scenarios so for implementations of LoadPeonCallback that care about success it - * is important to take extra measures to ensure that whatever side effects they expect to happen upon success - * have happened. Coordinator will have a complete and correct view of the cluster in the next run period. - */ void execute(boolean success); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java index 49e5f9a7c087..8e9989717ab9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java @@ -54,6 +54,8 @@ public interface LoadQueuePeon long getSizeOfSegmentsToLoad(); + long getLoadRateKbps(); + CoordinatorRunStats getAndResetStats(); /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java new file mode 100644 index 000000000000..218a41df5147 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.EvictingQueue; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Stopwatch; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tracks the current segment loading rate for a single server. + *

+ * The loading rate is computed as a moving average of the last + * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch was + * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of + * segments added to the load queue together. Usage: + *

    + *
  • Call {@link #markBatchLoadingStarted()} exactly once to indicate start of + * a batch.
  • + *
  • Call {@link #incrementBytesLoadedInBatch(long)} any number of times to + * increment successful loads done in the batch.
  • + *
  • Call {@link #markBatchLoadingFinished()} exactly once to complete the batch.
  • + *
+ * + *
+ *   batchDurationMillis
+ *   = t(load queue becomes empty) - t(first load request in batch is sent to server)
+ *
+ *   batchBytes = total bytes successfully loaded in batch
+ *
+ *   avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
+ *
+ *   overall avg loading rate (kbps)
+ *   = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)
+ * 
+ *

+ * This class is currently not required to be thread-safe as the caller + * {@link HttpLoadQueuePeon} itself ensures that the write methods of this class + * are only accessed by one thread at a time. + */ +@NotThreadSafe +public class LoadingRateTracker +{ + public static final int MOVING_AVERAGE_WINDOW_SIZE = 10; + + /** + * Minimum size of a single entry in the moving average window = 1 GiB. + */ + public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30; + + private final EvictingQueue window = EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE); + + /** + * Total stats for the whole window. This includes the total from the current + * batch as well. + *

+ * Maintained as an atomic reference to ensure computational correctness in + * {@link #getMovingAverageLoadRateKbps()}. Otherwise, it is possible to have + * a state where bytes have been updated for the entry but not time taken + * (or vice versa). + */ + private final AtomicReference windowTotal = new AtomicReference<>(); + + private Entry currentBatchTotal; + private Entry currentTail; + + private final Stopwatch currentBatchDuration = Stopwatch.createUnstarted(); + + /** + * Marks the start of loading of a batch of segments. This should be called when + * the first request in a batch is sent to the server. + */ + public void markBatchLoadingStarted() + { + if (isLoadingBatch()) { + // Do nothing + return; + } + + currentBatchDuration.restart(); + currentBatchTotal = new Entry(); + + // Add a fresh entry at the tail for this batch + final Entry evictedHead = addNewEntryIfTailIsFull(); + if (evictedHead != null) { + final Entry delta = new Entry(); + delta.bytes -= evictedHead.bytes; + delta.millisElapsed -= evictedHead.millisElapsed; + + windowTotal.updateAndGet(delta::incrementBy); + } + } + + /** + * @return if a batch of segments is currently being loaded. + */ + public boolean isLoadingBatch() + { + return currentBatchDuration.isRunning(); + } + + /** + * Adds the given number of bytes to the total data successfully loaded in the + * current batch. This causes an update of the current load rate. + * + * @throws DruidException if called without making a prior call to + * {@link #markBatchLoadingStarted()}. + */ + public void incrementBytesLoadedInBatch(long loadedBytes) + { + incrementBytesLoadedInBatch(loadedBytes, currentBatchDuration.millisElapsed()); + } + + @VisibleForTesting + void incrementBytesLoadedInBatch(final long bytes, final long batchDurationMillis) + { + if (!isLoadingBatch()) { + throw DruidException.defensive("markBatchLoadingStarted() must be called before tracking load progress."); + } + + final Entry delta = new Entry(); + delta.bytes = bytes; + delta.millisElapsed = batchDurationMillis - currentBatchTotal.millisElapsed; + + currentTail.incrementBy(delta); + currentBatchTotal.incrementBy(delta); + windowTotal.updateAndGet(delta::incrementBy); + } + + /** + * Marks the end of loading of a batch of segments. This method should be called + * when all the requests in the batch have been processed by the server. + */ + public void markBatchLoadingFinished() + { + if (isLoadingBatch()) { + currentBatchDuration.reset(); + currentBatchTotal = null; + } + } + + /** + * Stops this rate tracker and resets its current state. + */ + public void stop() + { + window.clear(); + windowTotal.set(null); + currentTail = null; + currentBatchTotal = null; + currentBatchDuration.reset(); + } + + /** + * Moving average load rate in kbps (1000 bits per second). + */ + public long getMovingAverageLoadRateKbps() + { + final Entry overallTotal = windowTotal.get(); + if (overallTotal == null || overallTotal.millisElapsed <= 0) { + return 0; + } else { + return (8 * overallTotal.bytes) / overallTotal.millisElapsed; + } + } + + /** + * Adds a fresh entry to the queue if the current tail entry is already full. + * + * @return Old head of the queue if it was evicted, null otherwise. + */ + private Entry addNewEntryIfTailIsFull() + { + final Entry oldHead = window.peek(); + + if (currentTail == null || currentTail.bytes >= MIN_ENTRY_SIZE_BYTES) { + currentTail = new Entry(); + window.add(currentTail); + } + + // Compare if the oldHead and the newHead are the same object (not equals) + final Entry newHead = window.peek(); + return newHead == oldHead ? null : oldHead; + } + + private static class Entry + { + long bytes; + long millisElapsed; + + Entry incrementBy(Entry delta) + { + if (delta != null) { + this.bytes += delta.bytes; + this.millisElapsed += delta.millisElapsed; + } + return this; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentHolder.java index ce199d654303..9f4a181699dd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentHolder.java @@ -21,18 +21,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.server.coordination.DataSegmentChangeRequest; import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; /** * Represents a segment queued for a load or drop operation in a LoadQueuePeon. @@ -57,14 +59,17 @@ public class SegmentHolder implements Comparable private final DataSegmentChangeRequest changeRequest; private final SegmentAction action; + private final Duration requestTimeout; + // Guaranteed to store only non-null elements private final List callbacks = new ArrayList<>(); - private final AtomicLong firstRequestMillis = new AtomicLong(0); + private final Stopwatch sinceRequestSentToServer = Stopwatch.createUnstarted(); private int runsInQueue = 0; public SegmentHolder( DataSegment segment, SegmentAction action, + Duration requestTimeout, @Nullable LoadPeonCallback callback ) { @@ -76,6 +81,7 @@ public SegmentHolder( if (callback != null) { callbacks.add(callback); } + this.requestTimeout = requestTimeout; } public DataSegment getSegment() @@ -124,17 +130,20 @@ public List getCallbacks() public void markRequestSentToServer() { - firstRequestMillis.compareAndSet(0L, System.currentTimeMillis()); - } - - public boolean isRequestSentToServer() - { - return firstRequestMillis.get() > 0; + if (!sinceRequestSentToServer.isRunning()) { + sinceRequestSentToServer.start(); + } } - public long getFirstRequestMillis() + /** + * A request is considered to have timed out if the time elapsed since it was + * first sent to the server is greater than the configured load timeout. + * + * @see HttpLoadQueuePeonConfig#getLoadTimeout() + */ + public boolean hasRequestTimedOut() { - return firstRequestMillis.get(); + return sinceRequestSentToServer.millisElapsed() > requestTimeout.getMillis(); } public int incrementAndGetRunsInQueue() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java index 5412230c2cc0..92b2acc052bc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java @@ -87,7 +87,7 @@ public long getSegmentStat(CoordinatorStat stat, String tier, String datasource) public long get(CoordinatorStat stat) { - return get(stat, RowKey.EMPTY); + return get(stat, RowKey.empty()); } public long get(CoordinatorStat stat, RowKey rowKey) @@ -196,7 +196,7 @@ public void clear() public void add(CoordinatorStat stat, long value) { - add(stat, RowKey.EMPTY, value); + add(stat, RowKey.empty(), value); } public void add(CoordinatorStat stat, RowKey rowKey, long value) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java index b0ee0a2d1f7f..874ac79b1d1b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java @@ -29,7 +29,7 @@ */ public class RowKey { - public static final RowKey EMPTY = new RowKey(Collections.emptyMap()); + private static final RowKey EMPTY = new RowKey(Collections.emptyMap()); private final Map values; private final int hashCode; @@ -52,6 +52,11 @@ public static RowKey of(Dimension dimension, String value) return with(dimension, value).build(); } + public static RowKey empty() + { + return EMPTY; + } + public Map getValues() { return values; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index e7b901e3ed9d..10873d894261 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -71,6 +71,8 @@ public static class SegmentQueue = CoordinatorStat.toDebugAndEmit("bytesToLoad", "segment/loadQueue/size"); public static final CoordinatorStat NUM_TO_DROP = CoordinatorStat.toDebugAndEmit("numToDrop", "segment/dropQueue/count"); + public static final CoordinatorStat LOAD_RATE_KBPS + = CoordinatorStat.toDebugAndEmit("loadRateKbps", "segment/loading/rateKbps"); public static final CoordinatorStat ASSIGNED_ACTIONS = CoordinatorStat.toDebugAndEmit("assignedActions", "segment/loadQueue/assigned"); diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java index 7a8c3e904dca..bffe2fe99d29 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java @@ -111,14 +111,23 @@ public Response getLoadQueue( return Response.ok( Maps.transformValues( coordinator.getLoadManagementPeons(), - input -> { - long loadSize = input.getSizeOfSegmentsToLoad(); - long dropSize = input.getSegmentsToDrop().stream().mapToLong(DataSegment::getSize).sum(); + peon -> { + long loadSize = peon.getSizeOfSegmentsToLoad(); + long dropSize = peon.getSegmentsToDrop().stream().mapToLong(DataSegment::getSize).sum(); + + // 1 kbps = 1/8 kB/s = 1/8 B/ms + long loadRateKbps = peon.getLoadRateKbps(); + long expectedLoadTimeMillis + = loadRateKbps > 0 && loadSize > 0 + ? (8 * loadSize) / loadRateKbps + : 0; + return new ImmutableMap.Builder<>() - .put("segmentsToLoad", input.getSegmentsToLoad().size()) - .put("segmentsToDrop", input.getSegmentsToDrop().size()) + .put("segmentsToLoad", peon.getSegmentsToLoad().size()) + .put("segmentsToDrop", peon.getSegmentsToDrop().size()) .put("segmentsToLoadSize", loadSize) .put("segmentsToDropSize", dropSize) + .put("expectedLoadTimeMillis", expectedLoadTimeMillis) .build(); } ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java index fcbeeebc726b..bb6d0406c699 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java @@ -62,6 +62,7 @@ public void testCollectedSegmentStats() CoordinatorRunStats stats = params.getCoordinatorStats(); Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD)); Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP)); + Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.LOAD_RATE_KBPS)); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index 16251130e77e..5928f47e0bb2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -60,9 +60,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -/** - * - */ public class HttpLoadQueuePeonTest { private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); @@ -75,26 +72,22 @@ public class HttpLoadQueuePeonTest private TestHttpClient httpClient; private HttpLoadQueuePeon httpLoadQueuePeon; - private BlockingExecutorService processingExecutor; - private BlockingExecutorService callbackExecutor; - - private final List processedSegments = new ArrayList<>(); @Before public void setUp() { httpClient = new TestHttpClient(); - processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s"); - callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb"); - processedSegments.clear(); - httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", MAPPER, httpClient, new HttpLoadQueuePeonConfig(null, null, 10), - new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", processingExecutor, true), - callbackExecutor + new WrappingScheduledExecutorService( + "HttpLoadQueuePeonTest-%s", + httpClient.processingExecutor, + true + ), + httpClient.callbackExecutor ); httpLoadQueuePeon.start(); } @@ -117,13 +110,12 @@ public void testSimple() httpLoadQueuePeon .loadSegment(segments.get(3), SegmentAction.MOVE_TO, markSegmentProcessed(segments.get(3))); - // Send requests to server - processingExecutor.finishAllPendingTasks(); + httpClient.sendRequestToServerAndHandleResponse(); Assert.assertEquals(segments, httpClient.segmentsSentToServer); // Verify that all callbacks are executed - callbackExecutor.finishAllPendingTasks(); - Assert.assertEquals(segments, processedSegments); + httpClient.executeCallbacks(); + Assert.assertEquals(segments, httpClient.processedSegments); } @Test @@ -170,8 +162,7 @@ public void testPriorityOfSegmentAction() Collections.shuffle(actions); actions.forEach(QueueAction::invoke); - // Send one batch of requests to the server - processingExecutor.finishAllPendingTasks(); + httpClient.sendRequestToServerAndHandleResponse(); // Verify that all segments are sent to the server in the expected order Assert.assertEquals(segmentsDay1, httpClient.segmentsSentToServer); @@ -194,7 +185,7 @@ public void testPriorityOfSegmentInterval() Collections.shuffle(segmentsDay2); // Assign segments to the actions in their order of priority - // Priority order: action (drop, priorityLoad, etc), then interval (new then old) + // Order: action (drop, priorityLoad, etc.), then interval (new then old) List actions = Arrays.asList( QueueAction.of(segmentsDay2.get(0), s -> httpLoadQueuePeon.dropSegment(s, null)), QueueAction.of(segmentsDay1.get(0), s -> httpLoadQueuePeon.dropSegment(s, null)), @@ -212,8 +203,7 @@ public void testPriorityOfSegmentInterval() Collections.shuffle(actions); actions.forEach(QueueAction::invoke); - // Send one batch of requests to the server - processingExecutor.finishNextPendingTask(); + httpClient.sendRequestToServerAndHandleResponse(); // Verify that all segments are sent to the server in the expected order Assert.assertEquals(expectedSegmentOrder, httpClient.segmentsSentToServer); @@ -230,7 +220,7 @@ public void testCancelLoad() Assert.assertTrue(cancelled); Assert.assertEquals(0, httpLoadQueuePeon.getSegmentsToLoad().size()); - Assert.assertTrue(processedSegments.isEmpty()); + Assert.assertTrue(httpClient.processedSegments.isEmpty()); } @Test @@ -244,7 +234,7 @@ public void testCancelDrop() Assert.assertTrue(cancelled); Assert.assertTrue(httpLoadQueuePeon.getSegmentsToDrop().isEmpty()); - Assert.assertTrue(processedSegments.isEmpty()); + Assert.assertTrue(httpClient.processedSegments.isEmpty()); } @Test @@ -254,8 +244,7 @@ public void testCannotCancelRequestSentToServer() httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, markSegmentProcessed(segment)); Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment)); - // Send the request to the server - processingExecutor.finishNextPendingTask(); + httpClient.sendRequestToServer(); Assert.assertTrue(httpClient.segmentsSentToServer.contains(segment)); // Segment is still in queue but operation cannot be cancelled @@ -263,8 +252,7 @@ public void testCannotCancelRequestSentToServer() boolean cancelled = httpLoadQueuePeon.cancelOperation(segment); Assert.assertFalse(cancelled); - // Handle response from server - processingExecutor.finishNextPendingTask(); + httpClient.handleResponseFromServer(); // Segment has been removed from queue Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().isEmpty()); @@ -272,8 +260,8 @@ public void testCannotCancelRequestSentToServer() Assert.assertFalse(cancelled); // Execute callbacks and verify segment is fully processed - callbackExecutor.finishAllPendingTasks(); - Assert.assertTrue(processedSegments.contains(segment)); + httpClient.executeCallbacks(); + Assert.assertTrue(httpClient.processedSegments.contains(segment)); } @Test @@ -287,14 +275,59 @@ public void testCannotCancelOperationMultipleTimes() Assert.assertFalse(httpLoadQueuePeon.cancelOperation(segment)); } + @Test + public void testLoadRateIsZeroWhenNoLoadHasFinishedYet() + { + httpLoadQueuePeon.loadSegment(segments.get(0), SegmentAction.LOAD, null); + httpClient.sendRequestToServer(); + Assert.assertEquals(1, httpLoadQueuePeon.getSegmentsToLoad().size()); + Assert.assertEquals(0, httpLoadQueuePeon.getLoadRateKbps()); + } + + @Test + public void testLoadRateIsUnchangedByDrops() throws InterruptedException + { + // Drop a segment after a small delay + final long millisTakenToDropSegment = 10; + httpLoadQueuePeon.dropSegment(segments.get(0), null); + httpClient.sendRequestToServer(); + Thread.sleep(millisTakenToDropSegment); + httpClient.handleResponseFromServer(); + + // Verify that load rate is still zero + Assert.assertEquals(0, httpLoadQueuePeon.getLoadRateKbps()); + } + + @Test + public void testLoadRateIsChangedWhenLoadSucceeds() throws InterruptedException + { + // Load a segment after a small delay + final long millisTakenToLoadSegment = 10; + httpLoadQueuePeon.loadSegment(segments.get(0), SegmentAction.LOAD, null); + httpClient.sendRequestToServer(); + Thread.sleep(millisTakenToLoadSegment); + httpClient.handleResponseFromServer(); + + // Verify that load rate has been updated + long expectedRateKbps = (8 * segments.get(0).getSize()) / millisTakenToLoadSegment; + long observedRateKbps = httpLoadQueuePeon.getLoadRateKbps(); + Assert.assertTrue( + observedRateKbps > expectedRateKbps / 2 + && observedRateKbps <= expectedRateKbps + ); + } + private LoadPeonCallback markSegmentProcessed(DataSegment segment) { - return success -> processedSegments.add(segment); + return success -> httpClient.processedSegments.add(segment); } private static class TestHttpClient implements HttpClient, DataSegmentChangeHandler { - private final List segmentsSentToServer = new ArrayList<>(); + final BlockingExecutorService processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s"); + final BlockingExecutorService callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb"); + final List processedSegments = new ArrayList<>(); + final List segmentsSentToServer = new ArrayList<>(); @Override public ListenableFuture go( @@ -353,6 +386,27 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac { segmentsSentToServer.add(segment); } + + void sendRequestToServerAndHandleResponse() + { + sendRequestToServer(); + handleResponseFromServer(); + } + + void sendRequestToServer() + { + processingExecutor.finishNextPendingTask(); + } + + void handleResponseFromServer() + { + processingExecutor.finishAllPendingTasks(); + } + + void executeCallbacks() + { + callbackExecutor.finishAllPendingTasks(); + } } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadingRateTrackerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadingRateTrackerTest.java new file mode 100644 index 000000000000..fafa1f3e0e31 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadingRateTrackerTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import org.apache.druid.error.DruidException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +public class LoadingRateTrackerTest +{ + private LoadingRateTracker tracker; + + @Before + public void setup() + { + tracker = new LoadingRateTracker(); + } + + @Test + public void testUpdateThrowsExceptionIfBatchNotStarted() + { + DruidException e = Assert.assertThrows( + DruidException.class, + () -> tracker.incrementBytesLoadedInBatch(1000, 10) + ); + Assert.assertEquals( + "markBatchLoadingStarted() must be called before tracking load progress.", + e.getMessage() + ); + } + + @Test + public void testRateIsZeroWhenEmpty() + { + Assert.assertEquals(0, tracker.getMovingAverageLoadRateKbps()); + } + + @Test + public void testRateIsZeroAfterStop() + { + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(1000, 10); + Assert.assertEquals(8 * 1000 / 10, tracker.getMovingAverageLoadRateKbps()); + + tracker.stop(); + Assert.assertEquals(0, tracker.getMovingAverageLoadRateKbps()); + } + + @Test + public void testRateAfter2UpdatesInBatch() + { + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(1000, 10); + Assert.assertEquals(8 * 1000 / 10, tracker.getMovingAverageLoadRateKbps()); + + tracker.incrementBytesLoadedInBatch(1000, 15); + Assert.assertEquals(8 * 2000 / 15, tracker.getMovingAverageLoadRateKbps()); + } + + @Test + public void testRateAfter2Batches() + { + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(1000, 10); + Assert.assertEquals(8 * 1000 / 10, tracker.getMovingAverageLoadRateKbps()); + tracker.markBatchLoadingFinished(); + + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(1000, 5); + Assert.assertEquals(8 * 2000 / 15, tracker.getMovingAverageLoadRateKbps()); + tracker.markBatchLoadingFinished(); + } + + @Test + public void test100UpdatesInABatch() + { + final Random random = new Random(1001); + + tracker.markBatchLoadingStarted(); + + long totalUpdateBytes = 0; + long monoticBatchDuration = 0; + for (int i = 0; i < 100; ++i) { + long updateBytes = 1 + random.nextInt(1000); + monoticBatchDuration = 1 + random.nextInt(10); + + tracker.incrementBytesLoadedInBatch(updateBytes, monoticBatchDuration); + + totalUpdateBytes += updateBytes; + Assert.assertEquals(8 * totalUpdateBytes / monoticBatchDuration, tracker.getMovingAverageLoadRateKbps()); + } + + tracker.markBatchLoadingFinished(); + Assert.assertEquals(8 * totalUpdateBytes / monoticBatchDuration, tracker.getMovingAverageLoadRateKbps()); + } + + @Test + public void testRateIsMovingAverage() + { + final Random random = new Random(1001); + final int windowSize = LoadingRateTracker.MOVING_AVERAGE_WINDOW_SIZE; + final long minEntrySizeBytes = LoadingRateTracker.MIN_ENTRY_SIZE_BYTES; + + // Add batch updates to fill up the window size + long[] updateBytes = new long[windowSize]; + long[] updateMillis = new long[windowSize]; + + long totalBytes = 0; + long totalMillis = 0; + for (int i = 0; i < windowSize; ++i) { + updateBytes[i] = minEntrySizeBytes + random.nextInt((int) minEntrySizeBytes); + updateMillis[i] = 1 + random.nextInt(1000); + + totalBytes += updateBytes[i]; + totalMillis += updateMillis[i]; + + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(updateBytes[i], updateMillis[i]); + Assert.assertEquals( + 8 * totalBytes / totalMillis, + tracker.getMovingAverageLoadRateKbps() + ); + tracker.markBatchLoadingFinished(); + } + + // Add another batch update + long latestUpdateBytes = 1; + long latestUpdateMillis = 1 + random.nextInt(1000); + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(latestUpdateBytes, latestUpdateMillis); + tracker.markBatchLoadingFinished(); + + // Verify that the average window has moved + totalBytes = totalBytes - updateBytes[0] + latestUpdateBytes; + totalMillis = totalMillis - updateMillis[0] + latestUpdateMillis; + Assert.assertEquals( + 8 * totalBytes / totalMillis, + tracker.getMovingAverageLoadRateKbps() + ); + } + + @Test + public void testWindowMovesOnlyAfterMinSizeUpdates() + { + final Random random = new Random(1001); + + long totalBytes = 0; + long totalMillis = 0; + + final int windowSize = LoadingRateTracker.MOVING_AVERAGE_WINDOW_SIZE; + final long minEntrySizeBytes = LoadingRateTracker.MIN_ENTRY_SIZE_BYTES; + + for (int i = 0; i < windowSize * 10; ++i) { + long updateBytes = 1 + random.nextInt((int) minEntrySizeBytes / 100); + long updateMillis = 1 + random.nextInt(1000); + + totalBytes += updateBytes; + totalMillis += updateMillis; + + tracker.markBatchLoadingStarted(); + tracker.incrementBytesLoadedInBatch(updateBytes, updateMillis); + tracker.markBatchLoadingFinished(); + + // Verify that the average window doesn't move + Assert.assertEquals( + 8 * totalBytes / totalMillis, + tracker.getMovingAverageLoadRateKbps() + ); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java index c496c37e3ecf..71ebc6fd8cf0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java @@ -56,6 +56,12 @@ public long getSizeOfSegmentsToLoad() return 0; } + @Override + public long getLoadRateKbps() + { + return 0; + } + @Override public CoordinatorRunStats getAndResetStats() { diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java index c25417a8d13a..9d2b81f6aa15 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -73,4 +74,29 @@ public void testIsLeader() Assert.assertEquals(ImmutableMap.of("leader", false), response2.getEntity()); Assert.assertEquals(404, response2.getStatus()); } + + @Test + public void testGetLoadStatusSimple() + { + EasyMock.expect(mock.getLoadManagementPeons()) + .andReturn(ImmutableMap.of("hist1", new TestLoadQueuePeon())) + .once(); + EasyMock.replay(mock); + + final Response response = new CoordinatorResource(mock).getLoadQueue("true", null); + Assert.assertEquals( + ImmutableMap.of( + "hist1", + ImmutableMap.of( + "segmentsToDrop", 0, + "segmentsToLoad", 0, + "segmentsToLoadSize", 0L, + "segmentsToDropSize", 0L, + "expectedLoadTimeMillis", 0L + ) + ), + response.getEntity() + ); + Assert.assertEquals(200, response.getStatus()); + } } diff --git a/website/.spelling b/website/.spelling index a80900b3edb9..1ac39471571d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -382,6 +382,7 @@ json_query json_query_array json_value karlkfi +kbps kerberos keystore keytool