Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest/processed/bytes metric #17581

Merged
merged 15 commits into from
Jan 24, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.MSQControllerTask;
Expand Down Expand Up @@ -329,6 +331,27 @@ public void run(final QueryListener queryListener) throws Exception
}
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);

long totalProcessedBytes = reportPayload.getCounters().copyMap().values().stream()
Copy link
Contributor

@cryptoe cryptoe Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a wrong place to put this logic .
Ingest/processed/bytes seems like a ingestion only metric no ?
If that is the case, we should emit the metric only if the query is an ingestion query.

you could probably expose a method here https://github.com/apache/druid/blob/9bebe7f1e5ab0f40efbff620769d0413c943683c/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java#L517 saying emit summary metrics and have the task report and the query passed to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the place where it has moved is correct.
Rather than ingest in the metric name can we rename the matric to input/processed/bytes or something since we would want that metric in msq selects as well.

Also the msq code might need to be adjusted so that only leaf nodes contribute to this metric no ? as an equivalent batch ingest with range partitioning will show less processed bytes since the shuffle stage input is not being counted for. A simple test should be sufficient to rule this out.

Try a query like replace bar all using select * from extern(http) partitioned by day clustered by col1 and an equivalent range partitioning spec for batch ingestion for the same http input source.
cc @kfaraz

Copy link
Contributor Author

@neha-ellur neha-ellur Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cryptoe This metric will be used in a fronting UI and should be named ingest/processed/bytes.
Regarding the msq code to being on the leaf nodes, where would that be? Regarding the test, any pointers to existing tests would be helpful, this is my first time in this area of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure ingest/processed/bytes make sense for select query.

MSQ runs using a DAG of stages.

has a
private final Map<StageId, StageDefinition> stageDefinitions;
And each stage definition would have private final List<InputSpec> inputSpecs;
I think the metric makes sense when we check if the input spec is not a StageInputSpec and only then plumb the input bytes to the final summary metric.

A UT like this can help you debug stuff :

public void testInsertOnExternalDataSource(String contextName, Map<String, Object> context) throws IOException

Attach breakpoint to to see the query definition in action.

Hope it helps.

.mapToLong(integerCounterSnapshotsMap -> integerCounterSnapshotsMap.values().stream()
.mapToLong(counterSnapshots -> {
Map<String, QueryCounterSnapshot> workerCounters = counterSnapshots.getMap();
return workerCounters.entrySet().stream()
.mapToLong(channel -> {
if (channel.getKey().startsWith("input")) {
ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) channel.getValue();
return snapshot.getBytes() == null ? 0L :
Arrays.stream(snapshot.getBytes()).sum();
}
return 0L;
})
.sum();
})
.sum())
.sum();

log.info("Total processed bytes: %d", totalProcessedBytes);
context.emitMetric("ingest/processed/bytes", totalProcessedBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ private TaskStatus generateAndPublishSegments(
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
published.getSegments().size() + tombStones.size()
);
emitMetric(toolbox.getEmitter(), "ingest/processed/bytes", buildSegmentsMeters.getProcessedBytes());

log.debugSegments(published.getSegments(), "Published segments");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,14 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);

// Emit the processed bytes metric
try {
emitMetric(toolbox.getEmitter(), "ingest/processed/bytes", rowStatsForRunningTasks.getProcessedBytes());
}
catch (Exception e) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn(e, "Unable to emit processed bytes metric");
}

return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
Expand Down Expand Up @@ -251,6 +253,7 @@ public enum Status
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
private final ServiceEmitter emitter;

public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
Expand All @@ -272,6 +275,7 @@ public SeekableStreamIndexTaskRunner(
this.sequences = new CopyOnWriteArrayList<>();
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse;
this.emitter = toolbox.getEmitter();

minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
Expand Down Expand Up @@ -657,6 +661,24 @@ public void run()
shouldProcess
);

long bytesProcessed = 0;
for (ByteEntity entity : record.getData()) {
bytesProcessed += entity.getBuffer().remaining();
}
kfaraz marked this conversation as resolved.
Show resolved Hide resolved

// Emit the processed bytes metric
try {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("taskId", task.getId())
.setDimension("dataSource", task.getDataSource())
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
.setMetric("ingest/processed/bytes", bytesProcessed)
);
}
catch (Exception e) {
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
log.warn(e, "Unable to emit processed bytes metric");
}

if (shouldProcess) {
final List<InputRow> rows = parser.parse(record.getData(), isEndOfShard(record.getSequenceNumber()));
boolean isPersistRequired = false;
Expand Down
Loading