Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow metrics: provide lazy-initiazed implementation #3

Draft
wants to merge 4 commits into
base: flow-metrics-extended
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/static/monitoring/monitoring-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,29 @@ Additionally, some amount of back-pressure is both _normal_ and _expected_ for p

|===

Each flow stat includes rates for one or more recent windows of time:

// Templates for short-hand notes in the table below
:flow-stable: pass:quotes[*Stable*]
:flow-preview: pass:quotes[_Technology Preview_]

[%autowidth.stretch]
|===
| Flow Window | Availability | Definition

| `current` | {flow-stable} | the most recent ~10s
| `lifetime` | {flow-stable} | the lifetime of the relevant pipeline or process
| `last_1_minute` | {flow-preview} | the most recent ~1 minute
| `last_5_minutes` | {flow-preview} | the most recent ~5 minutes
| `last_15_minutes` | {flow-preview} | the most recent ~15 minutes
| `last_1_hour` | {flow-preview} | the most recent ~1 hour
| `last_24_hours` | {flow-preview} | the most recent ~24 hours

|===

NOTE: The flow rate windows marked as "Technology Preview" are subject to change without notice.
Future releases of {ls} may include more, fewer, or different windows for each rate in response to community feedback.

[discrete]
[[pipeline-stats]]
==== Pipeline stats
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def get_counter(namespace, key)
private :get_counter

def create_flow_metric(name, numerator_metric, denominator_metric)
org.logstash.instrument.metrics.FlowMetric.new(name, numerator_metric, denominator_metric)
org.logstash.instrument.metrics.FlowMetric.create(name, numerator_metric, denominator_metric)
end
private :create_flow_metric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginFactoryExt;
Expand Down Expand Up @@ -527,7 +527,7 @@ public final IRubyObject collectFlowMetrics(final ThreadContext context) {
private static FlowMetric createFlowMetric(final RubySymbol name,
final Metric<? extends Number> numeratorMetric,
final Metric<? extends Number> denominatorMetric) {
return new FlowMetric(name.asJavaString(), numeratorMetric, denominatorMetric);
return FlowMetric.create(name.asJavaString(), numeratorMetric, denominatorMetric);
}

private LongCounter initOrGetCounterMetric(final ThreadContext context,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.instrument.metrics;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* This {@link BaseFlowMetric} is a shared-common base for all internal implementations of {@link FlowMetric}.
*/
abstract class BaseFlowMetric extends AbstractMetric<Map<String,Double>> implements FlowMetric {

static final Logger LOGGER = LogManager.getLogger(BaseFlowMetric.class);

// metric sources
private final Metric<? extends Number> numeratorMetric;
private final Metric<? extends Number> denominatorMetric;

protected final FlowCapture lifetimeBaseline;

final LongSupplier nanoTimeSupplier;

static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);

BaseFlowMetric(final LongSupplier nanoTimeSupplier,
final String name,
final Metric<? extends Number> numeratorMetric,
final Metric<? extends Number> denominatorMetric) {
super(name);
this.nanoTimeSupplier = nanoTimeSupplier;
this.numeratorMetric = numeratorMetric;
this.denominatorMetric = denominatorMetric;

this.lifetimeBaseline = doCapture();
LOGGER.trace("FlowMetric({}) -> {}", name, lifetimeBaseline);
}

@Override
public MetricType getType() {
return MetricType.FLOW_RATE;
}

protected FlowCapture doCapture() {
return new FlowCapture(nanoTimeSupplier.getAsLong(), numeratorMetric.getValue(), denominatorMetric.getValue());
}

/**
* @param current the most-recent {@link FlowCapture}
* @param baseline a non-null {@link FlowCapture} from which to compare.
* @return an {@link OptionalDouble} that will be non-empty IFF we have sufficient information
* to calculate a finite rate of change of the numerator relative to the denominator.
*/
protected static OptionalDouble calculateRate(final FlowCapture current, final FlowCapture baseline) {
Objects.requireNonNull(baseline, "baseline");
if (baseline == current) { return OptionalDouble.empty(); }

final BigDecimal deltaNumerator = current.numerator().subtract(baseline.numerator());
final BigDecimal deltaDenominator = current.denominator().subtract(baseline.denominator());

if (deltaDenominator.signum() == 0) {
return OptionalDouble.empty();
}

final BigDecimal rate = deltaNumerator.divide(deltaDenominator, LIMITED_PRECISION);

return OptionalDouble.of(rate.doubleValue());
}

/**
* @param current the most-recent {@link FlowCapture}
* @param possibleBaseline a {@link Supplier}{@code <FlowCapture>} that may return null
* @return an {@link OptionalDouble} that will be non-empty IFF we have sufficient information
* to calculate a finite rate of change of the numerator relative to the denominator.
*/
protected static OptionalDouble calculateRate(final FlowCapture current, final Supplier<FlowCapture> possibleBaseline) {
return Optional.ofNullable(possibleBaseline.get())
.map((baseline) -> calculateRate(current, baseline))
.orElseGet(OptionalDouble::empty);
}

}
Loading