Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic implementation of asynchronous metrics #1610

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Instrument
end

require 'opentelemetry/sdk/metrics/instrument/synchronous_instrument'
require 'opentelemetry/sdk/metrics/instrument/asynchronous_instrument'
require 'opentelemetry/sdk/metrics/instrument/counter'
require 'opentelemetry/sdk/metrics/instrument/histogram'
require 'opentelemetry/sdk/metrics/instrument/observable_counter'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Instrument
# {AsynchronousInstrument} contains the common functionality shared across
# the asynchronous instruments SDK instruments.
class AsynchronousInstrument
def initialize(name, unit, description, callback, instrumentation_scope, meter_provider)
@name = name
@unit = unit
@description = description
@instrumentation_scope = instrumentation_scope
@meter_provider = meter_provider
@metric_streams = []
@callbacks = []
@timeout = nil
@attributes = {}

init_callback(callback)
meter_provider.register_asynchronous_instrument(self)
end

# @api private
def register_with_new_metric_store(metric_store, aggregation: default_aggregation)
ms = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
@name,
@description,
@unit,
instrument_kind,
@meter_provider,
@instrumentation_scope,
aggregation,
@callbacks,
@timeout,
@attributes
)
@metric_streams << ms
metric_store.add_metric_stream(ms)
end

# The API MUST support creation of asynchronous instruments by passing zero or more callback functions
# to be permanently registered to the newly created instrument.
def init_callback(callback)
if callback.instance_of?(Proc)
@callbacks << callback
elsif callback.instance_of?(Array)
callback.each { |cb| @callbacks << cb if cb.instance_of?(Proc) }
else
OpenTelemetry.logger.warn "Only accept single Proc or Array of Proc for initialization with callback (given callback #{callback.class}"
end
end

# Where the API supports registration of callback functions after asynchronous instrumentation creation,
# the user MUST be able to undo registration of the specific callback after its registration by some means.
def register_callback(callback)
if callback.instance_of?(Proc)
@callbacks << callback
callback
else
OpenTelemetry.logger.warn "Only accept single Proc for registering callback (given callback #{callback.class}"
end
end

def unregister(callback)
@callbacks.delete(callback)
end

def timeout(timeout)
@timeout = timeout
end

def add_attributes(attributes)
@attributes.merge!(attributes) if attributes.instance_of?(Hash)
end

private

# update the observed value (after calling observe)
# invoke callback will execute callback and export metric_data that is observed
def update(timeout, attributes)
@metric_streams.each { |ms| ms.invoke_callback(timeout, attributes) }
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,31 @@ module OpenTelemetry
module SDK
module Metrics
module Instrument
# {ObservableCounter} is the SDK implementation of {OpenTelemetry::Metrics::ObservableCounter}.
class ObservableCounter < OpenTelemetry::Metrics::Instrument::ObservableCounter
attr_reader :name, :unit, :description
# {ObservableCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}.
# Asynchronous Counter is an asynchronous Instrument which reports monotonically increasing value(s) when the instrument is being observed.
class ObservableCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument
# Returns the instrument kind as a Symbol
#
# @return [Symbol]
def instrument_kind
:observable_counter
end

# Observe the ObservableCounter with fixed timeout duartion.
#
# @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value.
# @param [Hash{String => String, Numeric, Boolean, Array<String, Numeric, Boolean>}] attributes
# Values must be non-nil and (array of) string, boolean or numeric type.
# Array values must not contain nil elements and all elements must be of
# the same basic type (string, numeric, boolean).
def observe(timeout: nil, attributes: {})
update(timeout, attributes)
end

private

def initialize(name, unit, description, callback, meter)
@name = name
@unit = unit
@description = description
@callback = callback
@meter = meter
def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,31 @@ module OpenTelemetry
module SDK
module Metrics
module Instrument
# {ObservableGauge} is the SDK implementation of {OpenTelemetry::Metrics::ObservableGauge}.
class ObservableGauge < OpenTelemetry::Metrics::Instrument::ObservableGauge
attr_reader :name, :unit, :description
# {ObservableGauge} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}.
# Asynchronous Gauge is an asynchronous Instrument which reports non-additive value(s) (e.g. the room temperature)
class ObservableGauge < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument
# Returns the instrument kind as a Symbol
#
# @return [Symbol]
def instrument_kind
:observable_gauge
end

# Observe the ObservableCounter with fixed timeout duartion.
#
# @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value.
# @param [Hash{String => String, Numeric, Boolean, Array<String, Numeric, Boolean>}] attributes
# Values must be non-nil and (array of) string, boolean or numeric type.
# Array values must not contain nil elements and all elements must be of
# the same basic type (string, numeric, boolean).
def observe(timeout: nil, attributes: {})
update(timeout, attributes)
end

private

def initialize(name, unit, description, callback, meter)
@name = name
@unit = unit
@description = description
@callback = callback
@meter = meter
def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,32 @@ module OpenTelemetry
module SDK
module Metrics
module Instrument
# {ObservableUpDownCounter} is the SDK implementation of {OpenTelemetry::Metrics::ObservableUpDownCounter}.
class ObservableUpDownCounter < OpenTelemetry::Metrics::Instrument::ObservableUpDownCounter
attr_reader :name, :unit, :description
# {ObservableUpDownCounter} is the SDK implementation of {OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument}.
# Asynchronous UpDownCounter is an asynchronous Instrument which reports additive value(s) (e.g. the process heap size)
class ObservableUpDownCounter < OpenTelemetry::SDK::Metrics::Instrument::AsynchronousInstrument
# Returns the instrument kind as a Symbol
#
# @return [Symbol]
def instrument_kind
:observable_up_down_counter
end

# Observe the ObservableCounter with fixed timeout duartion.
# Everytime observe, the value should be sent to backend through exporter
#
# @param [int] timeout The timeout duration for callback to run, which MUST be a non-negative numeric value.
# @param [Hash{String => String, Numeric, Boolean, Array<String, Numeric, Boolean>}] attributes
# Values must be non-nil and (array of) string, boolean or numeric type.
# Array values must not contain nil elements and all elements must be of
# the same basic type (string, numeric, boolean).
def observe(timeout: nil, attributes: {})
update(timeout, attributes)
end

private

def initialize(name, unit, description, callback, meter)
@name = name
@unit = unit
@description = description
@callback = callback
@meter = meter
def default_aggregation
OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: :delta)
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/meter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ def initialize(name, version, meter_provider)
@meter_provider = meter_provider
end

# Multiple-instrument callbacks
# Callbacks registered after the time of instrument creation MAY be associated with multiple instruments.
# Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#multiple-instrument-callbacks
# Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#synchronous-instrument-api
#
# @param [Array] instruments A list (or tuple, etc.) of Instruments used in the callback function.
# @param [Proc] callback A callback function
#
# It is RECOMMENDED that the API authors use one of the following forms for the callback function:
# The list (or tuple, etc.) returned by the callback function contains (Instrument, Measurement) pairs.
# the Observable Result parameter receives an additional (Instrument, Measurement) pairs
# Here it chose the second form
def register_callback(instruments, callback)
instruments.each do |instrument|
instrument.register_callback(callback)
end
end

def unregister(instruments, callback)
instruments.each do |instrument|
instrument.unregister(callback)
end
end

# @api private
def add_metric_reader(metric_reader)
@instrument_registry.each do |_n, instrument|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def register_synchronous_instrument(instrument)
end
end
end
alias_method :register_asynchronous_instrument, :register_synchronous_instrument

# The type of the Instrument(s) (optional).
# The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters.
Expand Down
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ module State
require 'opentelemetry/sdk/metrics/state/metric_data'
require 'opentelemetry/sdk/metrics/state/metric_store'
require 'opentelemetry/sdk/metrics/state/metric_stream'
require 'opentelemetry/sdk/metrics/state/asynchronous_metric_stream'
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module State
# @api private
#
# The MetricStream class provides SDK internal functionality that is not a part of the
# public API.
class AsynchronousMetricStream
attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points

def initialize(
name,
description,
unit,
instrument_kind,
meter_provider,
instrumentation_scope,
aggregation,
callback,
timeout,
attributes
)
@name = name
@description = description
@unit = unit
@instrument_kind = instrument_kind
@meter_provider = meter_provider
@instrumentation_scope = instrumentation_scope
@aggregation = aggregation
@callback = callback
@start_time = now_in_nano
@timeout = timeout
@attributes = attributes

@mutex = Mutex.new
end

# When collect, if there are asynchronous SDK Instruments involved, their callback functions will be triggered.
# Related spec: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#collect
# invoke_callback will update the data_points in aggregation
def collect(start_time, end_time)
invoke_callback(@timeout, @attributes)

@mutex.synchronize do
MetricData.new(
@name,
@description,
@unit,
@instrument_kind,
@meter_provider.resource,
@instrumentation_scope,
@aggregation.collect(start_time, end_time),
@aggregation.aggregation_temporality,
start_time,
end_time
)
end
end

def invoke_callback(timeout, attributes)
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
@aggregation.update(value, attributes)
end
end
end
end

def to_s
instrument_info = String.new
instrument_info << "name=#{@name}"
instrument_info << " description=#{@description}" if @description
instrument_info << " unit=#{@unit}" if @unit
@data_points.map do |attributes, value|
metric_stream_string = String.new
metric_stream_string << instrument_info
metric_stream_string << " attributes=#{attributes}" if attributes
metric_stream_string << " #{value}"
metric_stream_string
end.join("\n")
end

def now_in_nano
(Time.now.to_r * 1_000_000_000).to_i
end
end
end
end
end
end
Loading
Loading