Skip to content

Commit

Permalink
Track and emit segment loading rate for HttpLoadQueuePeon on Coordina…
Browse files Browse the repository at this point in the history
…tor (apache#16691)

Design:
The loading rate is computed as a moving average of at least the last 10 GiB of successful segment loads.
To account for multiple loading threads on a server, we use the concept of a batch to track load times.
A batch is a set of segments added by the coordinator to the load queue of a server in one go.

Computation:
batchDurationMillis = t(load queue becomes empty) - t(first load request in batch is sent to server)
batchBytes = total bytes successfully loaded in batch
avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
overall avg loading rate (kbps) = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)

Changes:
- Add `LoadingRateTracker` which computes a moving average load rate based on
the last few GBs of successful segment loads.
- Emit metric `segment/loading/rateKbps` from the Coordinator. In the future, we may
also consider emitting this metric from the historicals themselves.
- Add `expectedLoadTimeMillis` to response of API `/druid/coordinator/v1/loadQueue?simple`
  • Loading branch information
kfaraz authored Aug 3, 2024
1 parent fe6772a commit 9dc2569
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 83 deletions.
3 changes: 2 additions & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ See [Enabling metrics](../configuration/index.md#enabling-metrics) for more deta

## Coordination

These metrics are for the Druid Coordinator and are reset each time the Coordinator runs the coordination logic.
These metrics are emitted by the Druid Coordinator in every run of the corresponding coordinator duty.

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
Expand All @@ -325,6 +325,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`segment/dropSkipped/count`|Number of segments that could not be dropped from any server.|`dataSource`, `tier`, `description`|Varies|
|`segment/loadQueue/size`|Size in bytes of segments to load.|`server`|Varies|
|`segment/loadQueue/count`|Number of segments to load.|`server`|Varies|
|`segment/loading/rateKbps`|Current rate of segment loading on a server in kbps (1000 bits per second). The rate is calculated as a moving average over the last 10 GiB or more of successful segment loads on that server.|`server`|Varies|
|`segment/dropQueue/count`|Number of segments to drop.|`server`|Varies|
|`segment/loadQueue/assigned`|Number of segments assigned for load or drop to the load queue of a server.|`dataSource`, `server`|Varies|
|`segment/loadQueue/success`|Number of segment assignments that completed successfully.|`dataSource`, `server`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private void collectLoadQueueStats(CoordinatorRunStats stats)
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size());
stats.updateMax(Stats.SegmentQueue.LOAD_RATE_KBPS, rowKey, queuePeon.getLoadRateKbps());

queuePeon.getAndResetStats().forEachStat(
(stat, key, statValue) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public long getSizeOfSegmentsToLoad()
return queuedSize.get();
}

@Override
public long getLoadRateKbps()
{
return 0;
}

@Override
public CoordinatorRunStats getAndResetStats()
{
Expand All @@ -179,7 +185,7 @@ public CoordinatorRunStats getAndResetStats()
@Override
public void loadSegment(final DataSegment segment, SegmentAction action, @Nullable final LoadPeonCallback callback)
{
SegmentHolder segmentHolder = new SegmentHolder(segment, action, callback);
SegmentHolder segmentHolder = new SegmentHolder(segment, action, Duration.ZERO, callback);
final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
Expand All @@ -193,7 +199,7 @@ public void loadSegment(final DataSegment segment, SegmentAction action, @Nullab
@Override
public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
{
SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, callback);
SegmentHolder segmentHolder = new SegmentHolder(segment, SegmentAction.DROP, Duration.ZERO, callback);
final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
if (existingHolder != null) {
existingHolder.addCallback(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentHashMap<>();
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentHashMap<>();
private final Set<DataSegment> segmentsMarkedToDrop = ConcurrentHashMap.newKeySet();
private final LoadingRateTracker loadingRateTracker = new LoadingRateTracker();

/**
* Segments currently in queue ordered by priority and interval. This includes
Expand Down Expand Up @@ -169,11 +171,10 @@ private void doSegmentManagement()
synchronized (lock) {
final Iterator<SegmentHolder> queuedSegmentIterator = queuedSegments.iterator();

final long currentTimeMillis = System.currentTimeMillis();
while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) {
final SegmentHolder holder = queuedSegmentIterator.next();
final DataSegment segment = holder.getSegment();
if (hasRequestTimedOut(holder, currentTimeMillis)) {
if (holder.hasRequestTimedOut()) {
onRequestFailed(holder, "timed out");
queuedSegmentIterator.remove();
if (holder.isLoad()) {
Expand All @@ -188,9 +189,13 @@ private void doSegmentManagement()
activeRequestSegments.add(segment);
}
}

if (segmentsToLoad.isEmpty()) {
loadingRateTracker.markBatchLoadingFinished();
}
}

if (newRequests.size() == 0) {
if (newRequests.isEmpty()) {
log.trace(
"[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].",
serverId, segmentsToLoad.size(), segmentsToDrop.size(), config.getBatchSize()
Expand All @@ -201,6 +206,11 @@ private void doSegmentManagement()

try {
log.trace("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId);
final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r instanceof SegmentChangeRequestLoad);
if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) {
loadingRateTracker.markBatchLoadingStarted();
}

BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = httpClient.go(
new Request(HttpMethod.POST, changeRequestURL)
Expand Down Expand Up @@ -234,9 +244,16 @@ public void onSuccess(InputStream result)
return;
}

int numSuccessfulLoads = 0;
long successfulLoadSize = 0;
for (DataSegmentChangeResponse e : statuses) {
switch (e.getStatus().getState()) {
case SUCCESS:
if (e.getRequest() instanceof SegmentChangeRequestLoad) {
++numSuccessfulLoads;
successfulLoadSize +=
((SegmentChangeRequestLoad) e.getRequest()).getSegment().getSize();
}
case FAILED:
handleResponseStatus(e.getRequest(), e.getStatus());
break;
Expand All @@ -248,6 +265,10 @@ public void onSuccess(InputStream result)
log.error("Server[%s] returned unknown state in status[%s].", serverId, e.getStatus());
}
}

if (numSuccessfulLoads > 0) {
loadingRateTracker.incrementBytesLoadedInBatch(successfulLoadSize);
}
}
}
catch (Exception ex) {
Expand Down Expand Up @@ -284,9 +305,7 @@ private void logRequestFailure(Throwable t)
log.error(
t,
"Request[%s] Failed with status[%s]. Reason[%s].",
changeRequestURL,
responseHandler.getStatus(),
responseHandler.getDescription()
changeRequestURL, responseHandler.getStatus(), responseHandler.getDescription()
);
}
},
Expand Down Expand Up @@ -367,7 +386,7 @@ public void stop()
if (stopped) {
return;
}
log.info("Stopping load queue peon for server [%s].", serverId);
log.info("Stopping load queue peon for server[%s].", serverId);
stopped = true;

// Cancel all queued requests
Expand All @@ -379,6 +398,7 @@ public void stop()
queuedSegments.clear();
activeRequestSegments.clear();
queuedSize.set(0L);
loadingRateTracker.stop();
stats.get().clear();
}
}
Expand All @@ -387,7 +407,7 @@ public void stop()
public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallback callback)
{
if (!action.isLoad()) {
log.warn("Invalid load action [%s] for segment [%s] on server [%s].", action, segment.getId(), serverId);
log.warn("Invalid load action[%s] for segment[%s] on server[%s].", action, segment.getId(), serverId);
return;
}

Expand All @@ -407,7 +427,7 @@ public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallb
if (holder == null) {
log.trace("Server[%s] to load segment[%s] queued.", serverId, segment.getId());
queuedSize.addAndGet(segment.getSize());
holder = new SegmentHolder(segment, action, callback);
holder = new SegmentHolder(segment, action, config.getLoadTimeout(), callback);
segmentsToLoad.put(segment, holder);
queuedSegments.add(holder);
processingExecutor.execute(this::doSegmentManagement);
Expand Down Expand Up @@ -436,7 +456,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback)

if (holder == null) {
log.trace("Server[%s] to drop segment[%s] queued.", serverId, segment.getId());
holder = new SegmentHolder(segment, SegmentAction.DROP, callback);
holder = new SegmentHolder(segment, SegmentAction.DROP, config.getLoadTimeout(), callback);
segmentsToDrop.put(segment, holder);
queuedSegments.add(holder);
processingExecutor.execute(this::doSegmentManagement);
Expand Down Expand Up @@ -481,6 +501,12 @@ public long getSizeOfSegmentsToLoad()
return queuedSize.get();
}

@Override
public long getLoadRateKbps()
{
return loadingRateTracker.getMovingAverageLoadRateKbps();
}

@Override
public CoordinatorRunStats getAndResetStats()
{
Expand All @@ -505,19 +531,6 @@ public Set<DataSegment> getSegmentsMarkedToDrop()
return Collections.unmodifiableSet(segmentsMarkedToDrop);
}

/**
* A request is considered to have timed out if the time elapsed since it was
* first sent to the server is greater than the configured load timeout.
*
* @see HttpLoadQueuePeonConfig#getLoadTimeout()
*/
private boolean hasRequestTimedOut(SegmentHolder holder, long currentTimeMillis)
{
return holder.isRequestSentToServer()
&& currentTimeMillis - holder.getFirstRequestMillis()
> config.getLoadTimeout().getMillis();
}

private void onRequestFailed(SegmentHolder holder, String failureCause)
{
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
package org.apache.druid.server.coordinator.loading;

/**
* Callback executed when the load or drop of a segment completes on a server
* either with success or failure.
*/
@FunctionalInterface
public interface LoadPeonCallback
{
/**
* Ideally, this method is called after the load/drop opertion is successfully done, i.e., the historical node
* removes the zookeeper node from loadQueue and announces/unannouces the segment. However, this method will
* also be called in failure scenarios so for implementations of LoadPeonCallback that care about success it
* is important to take extra measures to ensure that whatever side effects they expect to happen upon success
* have happened. Coordinator will have a complete and correct view of the cluster in the next run period.
*/
void execute(boolean success);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public interface LoadQueuePeon

long getSizeOfSegmentsToLoad();

long getLoadRateKbps();

CoordinatorRunStats getAndResetStats();

/**
Expand Down
Loading

0 comments on commit 9dc2569

Please sign in to comment.