Skip to content

Commit

Permalink
code refine
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang committed Sep 14, 2023
1 parent 9dd0a11 commit 6ac7eb7
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
int tombstoneSize = 0;
int numTombstones = 0;

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
Expand All @@ -1345,7 +1345,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
destination.getSegmentGranularity()
);
segmentsWithTombstones.addAll(tombstones);
tombstoneSize = tombstones.size();
numTombstones = tombstones.size();
}
catch (IllegalStateException e) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
Expand Down Expand Up @@ -1392,7 +1392,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
);
}

task.emitMetric(context.emitter(), "ingest/tombstones/count", tombstoneSize);
task.emitMetric(context.emitter(), "ingest/tombstones/count", numTombstones);
// Include tombstones in the reported segments count
task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class MSQTestControllerContext implements ControllerContext
);
private final Injector injector;
private final ObjectMapper mapper;
private final ServiceEmitter emitter = new NoopServiceEmitter();

private Controller controller;
private Map<String, TaskReport> report = null;
Expand Down Expand Up @@ -219,7 +221,7 @@ public void close()
@Override
public ServiceEmitter emitter()
{
return null;
return emitter;
}

@Override
Expand Down

0 comments on commit 6ac7eb7

Please sign in to comment.