Skip to content

Commit

Permalink
feat: add basic periodic exporting metric_reader (#1603)
Browse files Browse the repository at this point in the history
* feat: add basic periodic-reader

* feat: lint

* feat: make thread properly close

* Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb

Co-authored-by: Kayla Reopelle (she/her) <[email protected]>

* Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb

Co-authored-by: Kayla Reopelle (she/her) <[email protected]>

* Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb

Co-authored-by: Kayla Reopelle (she/her) <[email protected]>

* Update metrics_sdk/test/integration/periodic_metric_reader_test.rb

Co-authored-by: Kayla Reopelle (she/her) <[email protected]>

* feat: periodic reader - revision

* feat: change interval and timeout name

* feat: change back to millis

* revision

* lint

---------

Co-authored-by: Kayla Reopelle (she/her) <[email protected]>
Co-authored-by: Matthew Wear <[email protected]>
  • Loading branch information
3 people authored Aug 21, 2024
1 parent 0109aa7 commit 62bb150
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 1 deletion.
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ module Export
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/periodic_metric_reader'
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def pull
export(collect)
end

def export(metrics)
def export(metrics, timeout: nil)
@mutex.synchronize do
@metric_snapshots << metrics
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# PeriodicMetricReader provides a minimal example implementation.
class PeriodicMetricReader < MetricReader
# Returns a new instance of the {PeriodicMetricReader}.
#
# @param [Integer] export_interval_millis the maximum interval time.
# Defaults to the value of the OTEL_METRIC_EXPORT_INTERVAL environment
# variable, if set, or 60_000.
# @param [Integer] export_timeout_millis the maximum export timeout.
# Defaults to the value of the OTEL_METRIC_EXPORT_TIMEOUT environment
# variable, if set, or 30_000.
# @param [MetricReader] exporter the (duck type) MetricReader to where the
# recorded metrics are pushed after certain interval.
#
# @return a new instance of the {PeriodicMetricReader}.
def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)),
export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)),
exporter: nil)
super()

@export_interval = export_interval_millis / 1000.0
@export_timeout = export_timeout_millis / 1000.0
@exporter = exporter
@thread = nil
@continue = false
@mutex = Mutex.new
@export_mutex = Mutex.new

start
end

def shutdown(timeout: nil)
thread = lock do
@continue = false # force termination in next iteration
@thread
end
thread&.join(@export_interval)
@exporter.force_flush if @exporter.respond_to?(:force_flush)
@exporter.shutdown
Export::SUCCESS
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to shutdown PeriodicMetricReader.')
Export::FAILURE
end

def force_flush(timeout: nil)
export(timeout: timeout)
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

private

def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please shutdown it if it needs to restart.'
else
@thread = Thread.new do
while @continue
sleep(@export_interval)
begin
Timeout.timeout(@export_timeout) do
export(timeout: @export_timeout)
end
rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')
end
end
end
end
end

def export(timeout: nil)
@export_mutex.synchronize do
collected_metrics = collect
@exporter.export(collected_metrics, timeout: timeout || @export_timeout) unless collected_metrics.empty?
end
end

def lock(&block)
@mutex.synchronize(&block)
end
end
end
end
end
end
85 changes: 85 additions & 0 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK do
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(8)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')

_(first_snapshot[0].instrumentation_scope.name).must_equal('test')

_(first_snapshot[0].data_points[0].value).must_equal(1)
_(first_snapshot[0].data_points[0].attributes).must_equal({})

_(first_snapshot[0].data_points[1].value).must_equal(4)
_(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b')

_(first_snapshot[0].data_points[2].value).must_equal(3)
_(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c')

_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 1 metric after 1 second when interval is > 1 second' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end

0 comments on commit 62bb150

Please sign in to comment.