Skip to content

Commit

Permalink
Add metrics for number of segments generated per task in MSQ
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang committed Sep 13, 2023
1 parent 0f38a37 commit d286380
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;

import java.util.Map;
Expand All @@ -36,6 +37,8 @@
*/
public interface ControllerContext
{
ServiceEmitter emitter();

ObjectMapper jsonMapper();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
int tombstoneSize = 0;

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
Expand All @@ -1344,6 +1345,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
destination.getSegmentGranularity()
);
segmentsWithTombstones.addAll(tombstones);
tombstoneSize = tombstones.size();
}
catch (IllegalStateException e) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
Expand Down Expand Up @@ -1389,6 +1391,10 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
SegmentTransactionalInsertAction.appendAction(segments, null, null)
);
}

task.emitMetric(context.emitter(), "ingest/tombstones/count", tombstoneSize);
// segments count metric is documented to include tombstones
task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.WorkerClient;
Expand Down Expand Up @@ -67,6 +68,12 @@ public IndexerControllerContext(
this.workerManager = new IndexerWorkerManagerClient(overlordClient);
}

@Override
public ServiceEmitter emitter()
{
return toolbox.getEmitter();
}

@Override
public ObjectMapper jsonMapper()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.Worker;
Expand Down Expand Up @@ -215,6 +216,12 @@ public void close()
}
};

@Override
public ServiceEmitter emitter()
{
return null;
}

@Override
public ObjectMapper jsonMapper()
{
Expand Down

0 comments on commit d286380

Please sign in to comment.