Skip to content
This repository has been archived by the owner on Sep 1, 2024. It is now read-only.

Extract metric measurement and add to builder to allow the user of th… #60

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 85 additions & 6 deletions src/main/java/metrics_influxdb/InfluxdbReporter.java
Original file line number Diff line number Diff line change
@@ -28,6 +28,11 @@
import metrics_influxdb.measurements.MeasurementReporter;
import metrics_influxdb.measurements.Sender;
import metrics_influxdb.measurements.UdpInlinerSender;
import metrics_influxdb.measurements.reporter.CounterMeasurementReporter;
import metrics_influxdb.measurements.reporter.GaugeMeasurementReporter;
import metrics_influxdb.measurements.reporter.HistogramMeasurementReporter;
import metrics_influxdb.measurements.reporter.MeterMeasurementReporter;
import metrics_influxdb.measurements.reporter.TimerMeasurementReporter;
import metrics_influxdb.misc.Miscellaneous;
import metrics_influxdb.misc.VisibilityIncreasedForTests;
import metrics_influxdb.v08.Influxdb;
@@ -73,6 +78,12 @@ public static class Builder {
private MetricFilter filter;
private boolean skipIdleMetrics;
private ScheduledExecutorService executor;
private CounterMeasurementReporter counterMeasurementReporter;
private GaugeMeasurementReporter gaugeMeasurementReporter;
private HistogramMeasurementReporter histogramMeasurementReporter;
private MeterMeasurementReporter meterMeasurementReporter;
private TimerMeasurementReporter timerMeasurementReporter;


@VisibilityIncreasedForTests InfluxdbCompatibilityVersions influxdbVersion;
@VisibilityIncreasedForTests InfluxdbProtocol protocol;
@@ -90,6 +101,11 @@ private Builder(MetricRegistry registry) {
this.protocol = new HttpInfluxdbProtocol();
this.influxdbVersion = InfluxdbCompatibilityVersions.LATEST;
this.tags = new HashMap<>();
this.counterMeasurementReporter = new CounterMeasurementReporter();
this.gaugeMeasurementReporter = new GaugeMeasurementReporter();
this.histogramMeasurementReporter = new HistogramMeasurementReporter();
this.meterMeasurementReporter = new MeterMeasurementReporter(TimeUnit.SECONDS);
this.timerMeasurementReporter = new TimerMeasurementReporter(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, false);
}

/**
@@ -163,6 +179,61 @@ public Builder skipIdleMetrics(boolean skipIdleMetrics) {
return this;
}

/**
* Add a custom measurement reporter for counters.
*
* @param measurementReporter a {@link CounterMeasurementReporter}
* @return {@code this}
*/
public Builder counterMeasurementReporter(CounterMeasurementReporter measurementReporter) {
this.counterMeasurementReporter = measurementReporter;
return this;
}

/**
* Add a custom measurement reporter for gauges.
*
* @param measurementReporter a {@link GaugeMeasurementReporter}
* @return {@code this}
*/
public Builder gaugeMeasurementReporter(GaugeMeasurementReporter measurementReporter) {
this.gaugeMeasurementReporter = measurementReporter;
return this;
}

/**
* Add a custom measurement reporter for histograms.
*
* @param measurementReporter a {@link HistogramMeasurementReporter}
* @return {@code this}
*/
public Builder histogramMeasurementReporter(HistogramMeasurementReporter measurementReporter) {
this.histogramMeasurementReporter = measurementReporter;
return this;
}

/**
* Add a custom measurement reporter for meters.
*
* @param measurementReporter a {@link MeterMeasurementReporter}
* @return {@code this}
*/
public Builder meterMeasurementReporter(MeterMeasurementReporter measurementReporter) {
this.meterMeasurementReporter = measurementReporter;
return this;
}

/**
* Add a custom measurement reporter for timers.
*
* @param measurementReporter a {@link TimerMeasurementReporter}
* @return {@code this}
*/
public Builder timerMeasurementReporter(TimerMeasurementReporter measurementReporter) {
this.timerMeasurementReporter = measurementReporter;
return this;
}

/**
* Builds a {@link ScheduledReporter} with the given properties, sending
* metrics using the given InfluxDB.
@@ -181,17 +252,25 @@ public ScheduledReporter build() {
;
break;
default:
Sender s = buildSender();
reporter = executor == null
? new MeasurementReporter(s, registry, filter, rateUnit, durationUnit, clock, tags, transformer)
: new MeasurementReporter(s, registry, filter, rateUnit, durationUnit, clock, tags, transformer, executor)
;
if (timerMeasurementReporter == null) {
timerMeasurementReporter = new TimerMeasurementReporter(rateUnit, durationUnit, false);
}

if (meterMeasurementReporter == null) {
meterMeasurementReporter = new MeterMeasurementReporter(rateUnit);
}

if (executor != null) {
return new MeasurementReporter(buildSender(), registry, filter, rateUnit, durationUnit, clock, tags, transformer, counterMeasurementReporter, gaugeMeasurementReporter, histogramMeasurementReporter, meterMeasurementReporter, timerMeasurementReporter, executor);
} else {
return new MeasurementReporter(buildSender(), registry, filter, rateUnit, durationUnit, clock, tags, transformer, counterMeasurementReporter, gaugeMeasurementReporter, histogramMeasurementReporter, meterMeasurementReporter, timerMeasurementReporter);
}
}
return reporter;
}

/**
* Operates with influxdb version less or equal than 08.
* Operates with influxdb version less or equal than 08.
* @return the builder itself
*/
public Builder v08() {
174 changes: 48 additions & 126 deletions src/main/java/metrics_influxdb/measurements/MeasurementReporter.java
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@
import java.util.SortedMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
@@ -14,31 +13,51 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;

import metrics_influxdb.api.measurements.MetricMeasurementTransformer;
import metrics_influxdb.measurements.reporter.CounterMeasurementReporter;
import metrics_influxdb.measurements.reporter.GaugeMeasurementReporter;
import metrics_influxdb.measurements.reporter.HistogramMeasurementReporter;
import metrics_influxdb.measurements.reporter.MeterMeasurementReporter;
import metrics_influxdb.measurements.reporter.TimerMeasurementReporter;

public class MeasurementReporter extends ScheduledReporter{
private final Sender sender;

private final Sender sender;
private final Clock clock;
private Map<String, String> baseTags;
private MetricMeasurementTransformer transformer;

public MeasurementReporter(Sender sender, MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, Clock clock, Map<String, String> baseTags, MetricMeasurementTransformer transformer, ScheduledExecutorService executor) {
private GaugeMeasurementReporter gaugeMeasurementReporter;
private HistogramMeasurementReporter histogramMeasurementReporter;
private MeterMeasurementReporter meterMeasurementReporter;
private TimerMeasurementReporter timerMeasurementReporter;
private CounterMeasurementReporter counterMeasurementReporter;

public MeasurementReporter(Sender sender, MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, Clock clock, Map<String, String> baseTags, MetricMeasurementTransformer transformer, CounterMeasurementReporter counterMeasurementReporter, GaugeMeasurementReporter gaugeMeasurementReporter, HistogramMeasurementReporter histogramMeasurementReporter, MeterMeasurementReporter meterMeasurementReporter, TimerMeasurementReporter timerMeasurementReporter, ScheduledExecutorService executor) {
super(registry, "measurement-reporter", filter, rateUnit, durationUnit, executor);
this.baseTags = baseTags;
this.sender = sender;
this.clock = clock;
this.transformer = transformer;
this.counterMeasurementReporter = counterMeasurementReporter;
this.gaugeMeasurementReporter = gaugeMeasurementReporter;
this.histogramMeasurementReporter = histogramMeasurementReporter;
this.meterMeasurementReporter = meterMeasurementReporter;
this.timerMeasurementReporter = timerMeasurementReporter;
}

public MeasurementReporter(Sender sender, MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, Clock clock, Map<String, String> baseTags, MetricMeasurementTransformer transformer) {
public MeasurementReporter(Sender sender, MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, Clock clock, Map<String, String> baseTags, MetricMeasurementTransformer transformer, CounterMeasurementReporter counterMeasurementReporter, GaugeMeasurementReporter gaugeMeasurementReporter, HistogramMeasurementReporter histogramMeasurementReporter, MeterMeasurementReporter meterMeasurementReporter, TimerMeasurementReporter timerMeasurementReporter) {
super(registry, "measurement-reporter", filter, rateUnit, durationUnit);
this.baseTags = baseTags;
this.sender = sender;
this.clock = clock;
this.transformer = transformer;
this.counterMeasurementReporter = counterMeasurementReporter;
this.gaugeMeasurementReporter = gaugeMeasurementReporter;
this.histogramMeasurementReporter = histogramMeasurementReporter;
this.meterMeasurementReporter = meterMeasurementReporter;
this.timerMeasurementReporter = timerMeasurementReporter;
}

@SuppressWarnings("rawtypes")
@@ -52,142 +71,45 @@ public void report(SortedMap<String, Gauge> gauges
final long timestamp = clock.getTime();

for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
sender.send(fromGauge(entry.getKey(), entry.getValue(), timestamp));

Map<String, String> tags = new HashMap<>(baseTags);
tags.putAll(transformer.tags(entry.getKey()));

sender.send(gaugeMeasurementReporter.getMeasurement(entry.getKey(), transformer.measurementName(entry.getKey()), tags, entry.getValue(), timestamp));
}

for (Map.Entry<String, Counter> entry : counters.entrySet()) {
sender.send(fromCounter(entry.getKey(), entry.getValue(), timestamp));
}

for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
sender.send(fromHistogram(entry.getKey(), entry.getValue(), timestamp));
}
Map<String, String> tags = new HashMap<>(baseTags);
tags.putAll(transformer.tags(entry.getKey()));

for (Map.Entry<String, Meter> entry : meters.entrySet()) {
sender.send(fromMeter(entry.getKey(), entry.getValue(), timestamp));
sender.send(counterMeasurementReporter.getMeasurement(entry.getKey(), transformer.measurementName(entry.getKey()), tags, entry.getValue(), timestamp));
}

for (Map.Entry<String, Timer> entry : timers.entrySet()) {
sender.send(fromTimer(entry.getKey(), entry.getValue(), timestamp));
}
for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {

sender.flush();
}
Map<String, String> tags = new HashMap<>(baseTags);
tags.putAll(transformer.tags(entry.getKey()));

private Measure fromTimer(String metricName, Timer t, long timestamp) {
Snapshot snapshot = t.getSnapshot();

Map<String, String> tags = new HashMap<String, String>(baseTags);
tags.putAll(transformer.tags(metricName));

Measure measure = new Measure(transformer.measurementName(metricName))
.timestamp(timestamp)
.addTag(tags)
.addValue("count", snapshot.size())
.addValue("min", convertDuration(snapshot.getMin()))
.addValue("max", convertDuration(snapshot.getMax()))
.addValue("mean", convertDuration(snapshot.getMean()))
.addValue("std-dev", convertDuration(snapshot.getStdDev()))
.addValue("50-percentile", convertDuration(snapshot.getMedian()))
.addValue("75-percentile", convertDuration(snapshot.get75thPercentile()))
.addValue("95-percentile", convertDuration(snapshot.get95thPercentile()))
.addValue("99-percentile", convertDuration(snapshot.get99thPercentile()))
.addValue("999-percentile", convertDuration(snapshot.get999thPercentile()))
.addValue("one-minute", convertRate(t.getOneMinuteRate()))
.addValue("five-minute", convertRate(t.getFiveMinuteRate()))
.addValue("fifteen-minute", convertRate(t.getFifteenMinuteRate()))
.addValue("mean-minute", convertRate(t.getMeanRate()))
.addValue("run-count", t.getCount());

return measure;
}
sender.send(histogramMeasurementReporter.getMeasurement(entry.getKey(), transformer.measurementName(entry.getKey()), tags, entry.getValue(), timestamp));
}

private Measure fromMeter(String metricName, Meter mt, long timestamp) {
Map<String, String> tags = new HashMap<String, String>(baseTags);
tags.putAll(transformer.tags(metricName));

Measure measure = new Measure(transformer.measurementName(metricName))
.timestamp(timestamp)
.addTag(tags)
.addValue("count", mt.getCount())
.addValue("one-minute", convertRate(mt.getOneMinuteRate()))
.addValue("five-minute", convertRate(mt.getFiveMinuteRate()))
.addValue("fifteen-minute", convertRate(mt.getFifteenMinuteRate()))
.addValue("mean-minute", convertRate(mt.getMeanRate()));
return measure;
}
for (Map.Entry<String, Meter> entry : meters.entrySet()) {

private Measure fromHistogram(String metricName, Histogram h, long timestamp) {
Snapshot snapshot = h.getSnapshot();

Map<String, String> tags = new HashMap<String, String>(baseTags);
tags.putAll(transformer.tags(metricName));

Measure measure = new Measure(transformer.measurementName(metricName))
.timestamp(timestamp)
.addTag(tags)
.addValue("count", snapshot.size())
.addValue("min", snapshot.getMin())
.addValue("max", snapshot.getMax())
.addValue("mean", snapshot.getMean())
.addValue("std-dev", snapshot.getStdDev())
.addValue("50-percentile", snapshot.getMedian())
.addValue("75-percentile", snapshot.get75thPercentile())
.addValue("95-percentile", snapshot.get95thPercentile())
.addValue("99-percentile", snapshot.get99thPercentile())
.addValue("999-percentile", snapshot.get999thPercentile())
.addValue("run-count", h.getCount());
return measure;
}
Map<String, String> tags = new HashMap<>(baseTags);
tags.putAll(transformer.tags(entry.getKey()));

private Measure fromCounter(String metricName, Counter c, long timestamp) {
Map<String, String> tags = new HashMap<String, String>(baseTags);
tags.putAll(transformer.tags(metricName));
sender.send(meterMeasurementReporter.getMeasurement(entry.getKey(), transformer.measurementName(entry.getKey()), tags, entry.getValue(), timestamp));
}

Measure measure = new Measure(transformer.measurementName(metricName))
.timestamp(timestamp)
.addTag(tags)
.addValue("count", c.getCount());
for (Map.Entry<String, Timer> entry : timers.entrySet()) {

return measure;
}
Map<String, String> tags = new HashMap<>(baseTags);
tags.putAll(transformer.tags(entry.getKey()));

@SuppressWarnings("rawtypes")
private Measure fromGauge(String metricName, Gauge g, long timestamp) {
Map<String, String> tags = new HashMap<String, String>(baseTags);
tags.putAll(transformer.tags(metricName));

Measure measure = new Measure(transformer.measurementName(metricName))
.timestamp(timestamp)
.addTag(tags);
Object o = g.getValue();

if (o == null) {
// skip null values
return null;
}
if (o instanceof Long || o instanceof Integer) {
long value = ((Number)o).longValue();
measure.addValue("value", value);
} else if (o instanceof Double) {
Double d = (Double) o;
if (d.isInfinite() || d.isNaN()) {
// skip Infinite & NaN
return null;
}
measure.addValue("value", d.doubleValue());
} else if (o instanceof Float) {
Float f = (Float) o;
if (f.isInfinite() || f.isNaN()) {
// skip Infinite & NaN
return null;
}
measure.addValue("value", f.floatValue());
} else {
String value = ""+o;
measure.addValue("value", value);
sender.send(timerMeasurementReporter.getMeasurement(entry.getKey(), transformer.measurementName(entry.getKey()), tags, entry.getValue(), timestamp));
}

return measure;
sender.flush();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package metrics_influxdb.measurements.reporter;

import com.codahale.metrics.Counter;
import metrics_influxdb.measurements.Measure;

import java.util.Map;

public class CounterMeasurementReporter {

public Measure getMeasurement(String metricName, String transformedName, Map<String, String> tags, Counter metric, long timestamp) {

return new Measure(transformedName)
.timestamp(timestamp)
.addTag(tags)
.addValue("count", metric.getCount());
}
}
Loading