Skip to content

Commit

Permalink
MSQ: Add CPU and thread usage counters. (apache#16914)
Browse files Browse the repository at this point in the history
* MSQ: Add CPU and thread usage counters.

The main change adds "cpu" and "wall" counters. The "cpu" counter measures
CPU time (using JvmUtils.getCurrentThreadCpuTime) taken up by processors
in processing threads. The "wall" counter measures the amount of wall time
taken up by processors in those same processing threads. Both counters are
broken down by type of processor.

This patch also includes changes to support adding new counters. Due to an
oversight in the original design, older deserializers are not forwards-compatible;
they throw errors when encountering an unknown counter type. To manage this,
the following changes are made:

1) The defaultImpl NilQueryCounterSnapshot is added to QueryCounterSnapshot's
   deserialization configuration. This means that any unrecognized counter types
   will be read as "nil" by deserializers. Going forward, once all servers are
   on the latest code, this is enough to enable easily adding new counters.

2) A new context parameter "includeAllCounters" is added, which defaults to "false".
   When this parameter is set "false", only legacy counters are included. When set
   to "true", all counters are included. This is currently undocumented. In a future
   version, we should set the default to "true", and at that time, include a release
   note that people updating from versions prior to Druid 31 should set this to
   "false" until their upgrade is complete.

* Style, coverage.

* Fix.
  • Loading branch information
gianm authored and edgar2020 committed Sep 5, 2024
1 parent 336c59b commit b25bb4c
Show file tree
Hide file tree
Showing 22 changed files with 871 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class CounterNames
private static final String INPUT = "input";
private static final String OUTPUT = "output";
private static final String SHUFFLE = "shuffle";
private static final String CPU = "cpu";
private static final String SORT_PROGRESS = "sortProgress";
private static final String SEGMENT_GENERATION_PROGRESS = "segmentGenerationProgress";
private static final String WARNINGS = "warnings";
Expand Down Expand Up @@ -68,6 +69,14 @@ public static String shuffleChannel()
return SHUFFLE;
}

/**
* Standard name for CPU counters created by {@link CounterTracker#cpu}.
*/
public static String cpu()
{
return CPU;
}

/**
* Standard name for a sort progress counter created by {@link CounterTracker#sortProgress()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

/**
* Named counter snapshots. Immutable. Often part of a {@link CounterSnapshotsTree}.
* Created by {@link CounterTracker#snapshot()}.
*/
public class CounterSnapshots
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

package org.apache.druid.msq.counters;

import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.SuperSorterProgressTracker;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.utils.JvmUtils;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -37,11 +42,55 @@ public class CounterTracker
{
private final ConcurrentHashMap<String, QueryCounter> countersMap = new ConcurrentHashMap<>();

/**
* See {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}.
*/
private final boolean includeAllCounters;

public CounterTracker(boolean includeAllCounters)
{
this.includeAllCounters = includeAllCounters;
}

public ChannelCounters channel(final String name)
{
return counter(name, ChannelCounters::new);
}

/**
* Returns a {@link CpuCounter} that can be used to accumulate CPU time under a particular label.
*/
public CpuCounter cpu(final String name)
{
return counter(CounterNames.cpu(), CpuCounters::new).forName(name);
}

/**
* Decorates a {@link FrameProcessor} such that it accumulates CPU time under a particular label.
*/
public <T> FrameProcessor<T> trackCpu(final FrameProcessor<T> processor, final String name)
{
if (JvmUtils.isThreadCpuTimeEnabled()) {
final CpuCounter counter = counter(CounterNames.cpu(), CpuCounters::new).forName(name);
return new CpuTimeAccumulatingFrameProcessor<>(processor, counter);
} else {
return processor;
}
}

/**
* Decorates a {@link ProcessorManager} such that it accumulates CPU time under a particular label.
*/
public <T, R> ProcessorManager<T, R> trackCpu(final ProcessorManager<T, R> processorManager, final String name)
{
if (JvmUtils.isThreadCpuTimeEnabled()) {
final CpuCounter counter = counter(CounterNames.cpu(), CpuCounters::new).forName(name);
return new CpuTimeAccumulatingProcessorManager<>(processorManager, counter);
} else {
return processorManager;
}
}

public SuperSorterProgressTracker sortProgress()
{
return counter(CounterNames.sortProgress(), SuperSorterProgressTrackerCounter::new).tracker();
Expand Down Expand Up @@ -69,11 +118,23 @@ public CounterSnapshots snapshot()

for (final Map.Entry<String, QueryCounter> entry : countersMap.entrySet()) {
final QueryCounterSnapshot counterSnapshot = entry.getValue().snapshot();
if (counterSnapshot != null) {
if (counterSnapshot != null && (includeAllCounters || isLegacyCounter(counterSnapshot))) {
m.put(entry.getKey(), counterSnapshot);
}
}

return new CounterSnapshots(m);
}

/**
* Returns whether a counter is a "legacy counter" that can be snapshotted regardless of the value of
* {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}.
*/
private static boolean isLegacyCounter(final QueryCounterSnapshot counterSnapshot)
{
return counterSnapshot instanceof ChannelCounters.Snapshot
|| counterSnapshot instanceof SuperSorterProgressTrackerCounter.Snapshot
|| counterSnapshot instanceof WarningCounters.Snapshot
|| counterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.counters;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.utils.JvmUtils;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class CpuCounter implements QueryCounter
{
private final AtomicLong cpuTime = new AtomicLong();
private final AtomicLong wallTime = new AtomicLong();

public void accumulate(final long cpu, final long wall)
{
cpuTime.addAndGet(cpu);
wallTime.addAndGet(wall);
}

public <E extends Throwable> void run(final Doer<E> doer) throws E
{
final long startCpu = JvmUtils.getCurrentThreadCpuTime();
final long startWall = System.nanoTime();

try {
doer.run();
}
finally {
accumulate(
JvmUtils.getCurrentThreadCpuTime() - startCpu,
System.nanoTime() - startWall
);
}
}

public <T, E extends Throwable> T run(final Returner<T, E> returner) throws E
{
final long startCpu = JvmUtils.getCurrentThreadCpuTime();
final long startWall = System.nanoTime();

try {
return returner.run();
}
finally {
accumulate(
JvmUtils.getCurrentThreadCpuTime() - startCpu,
System.nanoTime() - startWall
);
}
}

@Override
public Snapshot snapshot()
{
return new Snapshot(cpuTime.get(), wallTime.get());
}

@JsonTypeName("cpu")
public static class Snapshot implements QueryCounterSnapshot
{
private final long cpuTime;
private final long wallTime;

@JsonCreator
public Snapshot(
@JsonProperty("cpu") long cpuTime,
@JsonProperty("wall") long wallTime
)
{
this.cpuTime = cpuTime;
this.wallTime = wallTime;
}

@JsonProperty("cpu")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public long getCpuTime()
{
return cpuTime;
}

@JsonProperty("wall")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public long getWallTime()
{
return wallTime;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Snapshot snapshot = (Snapshot) o;
return cpuTime == snapshot.cpuTime && wallTime == snapshot.wallTime;
}

@Override
public int hashCode()
{
return Objects.hash(cpuTime, wallTime);
}

@Override
public String toString()
{
return "CpuCounter.Snapshot{" +
"cpuTime=" + cpuTime +
", wallTime=" + wallTime +
'}';
}
}

public interface Doer<E extends Throwable>
{
void run() throws E;
}

public interface Returner<T, E extends Throwable>
{
T run() throws E;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.counters;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class CpuCounters implements QueryCounter
{
public static final String LABEL_MAIN = "main";
public static final String LABEL_KEY_STATISTICS = "collectKeyStatistics";
public static final String LABEL_MERGE_INPUT = "mergeInput";
public static final String LABEL_HASH_PARTITION = "hashPartitionOutput";
public static final String LABEL_MIX = "mixOutput";
public static final String LABEL_SORT = "sortOutput";

private final ConcurrentHashMap<String, CpuCounter> counters = new ConcurrentHashMap<>();

public CpuCounter forName(final String name)
{
return counters.computeIfAbsent(name, k -> new CpuCounter());
}

@Nullable
@Override
public CpuCounters.Snapshot snapshot()
{
final Map<String, CpuCounter.Snapshot> snapshotMap = new HashMap<>();
for (Map.Entry<String, CpuCounter> entry : counters.entrySet()) {
snapshotMap.put(entry.getKey(), entry.getValue().snapshot());
}
return new Snapshot(snapshotMap);
}

@JsonTypeName("cpus")
public static class Snapshot implements QueryCounterSnapshot
{
// String keys, not enum, so deserialization is forwards-compatible
private final Map<String, CpuCounter.Snapshot> map;

@JsonCreator
public Snapshot(Map<String, CpuCounter.Snapshot> map)
{
this.map = Preconditions.checkNotNull(map, "map");
}

@JsonValue
public Map<String, CpuCounter.Snapshot> getCountersMap()
{
return map;
}

@Override
public boolean equals(Object o)
{

if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Snapshot snapshot = (Snapshot) o;
return Objects.equals(map, snapshot.map);
}

@Override
public int hashCode()
{
return Objects.hash(map);
}

@Override
public String toString()
{
return "CpuCounters.Snapshot{" +
"map=" + map +
'}';
}
}
}
Loading

0 comments on commit b25bb4c

Please sign in to comment.