Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic periodic exporting metric_reader #1603

Merged
merged 20 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
32a6582
feat: add basic periodic-reader
xuan-cao-swi Feb 21, 2024
3566cd2
feat: lint
xuan-cao-swi Feb 21, 2024
cb84455
feat: make thread properly close
xuan-cao-swi Feb 22, 2024
0ecb543
Merge branch 'main' of github.com:xuan-cao-swi/opentelemetry-ruby int…
xuan-cao-swi Mar 26, 2024
3731304
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
2aef8bf
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
8e90ebb
Update metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metr…
xuan-cao-swi Apr 5, 2024
23d9a1f
Update metrics_sdk/test/integration/periodic_metric_reader_test.rb
xuan-cao-swi Apr 5, 2024
02055b7
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 5, 2024
2d31c72
Merge branch 'periodic-reader' of github.com:xuan-cao-swi/opentelemet…
xuan-cao-swi Apr 5, 2024
512f959
feat: periodic reader - revision
xuan-cao-swi Apr 6, 2024
da09c44
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 10, 2024
ed9029c
Merge branch 'main' into periodic-reader
xuan-cao-swi Apr 23, 2024
63405d5
feat: change interval and timeout name
xuan-cao-swi Apr 23, 2024
9fad788
Merge branch 'main' into periodic-reader
xuan-cao-swi May 15, 2024
2662097
feat: change back to millis
xuan-cao-swi May 22, 2024
4fd4e2f
Merge branch 'main' into periodic-reader
xuan-cao-swi Aug 7, 2024
81edbc5
revision
xuan-cao-swi Aug 7, 2024
cfa4974
lint
xuan-cao-swi Aug 7, 2024
0cb373c
Merge branch 'main' into periodic-reader
mwear Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ module Export
require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter'
require 'opentelemetry/sdk/metrics/export/periodic_metric_reader'
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def pull
export(collect)
end

def export(metrics)
def export(metrics, timeout: nil)
@mutex.synchronize do
@metric_snapshots << metrics
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
# PeriodicMetricReader provides a minimal example implementation.
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
class PeriodicMetricReader < MetricReader
# Returns a new instance of the {PeriodicMetricReader}.
#
# @param [Integer] export_interval_millis the maximum interval time.
# Defaults to the value of the OTEL_METRIC_EXPORT_INTERVAL environment
# variable, if set, or 60_000.
# @param [Integer] export_timeout_millis the maximum export timeout.
# Defaults to the value of the OTEL_METRIC_EXPORT_TIMEOUT environment
# variable, if set, or 30_000.
# @param [MetricReader] exporter the (duck type) MetricReader to where the
# recorded metrics are pushed after certain interval.
#
# @return a new instance of the {PeriodicMetricReader}.
def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)),
export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)),
exporter: nil)
super()

@export_interval = export_interval_millis / 1000.0
@export_timeout = export_timeout_millis / 1000.0
@exporter = exporter
@thread = nil
@continue = false
@mutex = Mutex.new
@export_mutex = Mutex.new

start
end

def shutdown(timeout: nil)
thread = lock do
@continue = false # force termination in next iteration
@thread
end
thread&.join(@export_interval)
@exporter.force_flush if @exporter.respond_to?(:force_flush)
@exporter.shutdown
Export::SUCCESS
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'Fail to shutdown PeriodicMetricReader.')
Export::FAILURE
end

def force_flush(timeout: nil)
export(timeout: timeout)
Export::SUCCESS
rescue StandardError
Export::FAILURE
end

private

def start
@continue = true
if @exporter.nil?
OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.'
elsif @thread&.alive?
OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please shutdown it if it needs to restart.'
else
@thread = Thread.new do
while @continue
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
sleep(@export_interval)
begin
Timeout.timeout(@export_timeout) do
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
export(timeout: @export_timeout)
end
rescue Timeout::Error => e
OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') evaluating to FAILURE ? if not then we should return FAILURE explicitly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think metric_reader shouldn't return anything (similar to metric_reader); only exporter will return either fail or success from export

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, you are absolutely right.

end
end
end
end
end

def export(timeout: nil)
@export_mutex.synchronize do
collected_metrics = collect
@exporter.export(collected_metrics, timeout: timeout || @export_timeout) unless collected_metrics.empty?
end
end

def lock(&block)
@mutex.synchronize(&block)
end
end
end
end
end
end
85 changes: 85 additions & 0 deletions metrics_sdk/test/integration/periodic_metric_reader_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK do
describe '#periodic_metric_reader' do
before { reset_metrics_sdk }

it 'emits 2 metrics after 10 seconds' do
OpenTelemetry::SDK.configure
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(8)
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(2)

first_snapshot = snapshot[0]
_(first_snapshot[0].name).must_equal('counter')
_(first_snapshot[0].unit).must_equal('smidgen')
_(first_snapshot[0].description).must_equal('a small amount of something')

_(first_snapshot[0].instrumentation_scope.name).must_equal('test')

_(first_snapshot[0].data_points[0].value).must_equal(1)
_(first_snapshot[0].data_points[0].attributes).must_equal({})

_(first_snapshot[0].data_points[1].value).must_equal(4)
_(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b')

_(first_snapshot[0].data_points[2].value).must_equal(3)
_(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c')

_(first_snapshot[0].data_points[3].value).must_equal(4)
_(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e')

_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end

it 'emits 1 metric after 1 second when interval is > 1 second' do
OpenTelemetry::SDK.configure

metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new
periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter)

OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader)

meter = OpenTelemetry.meter_provider.meter('test')
counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something')

counter.add(1)
counter.add(2, attributes: { 'a' => 'b' })
counter.add(2, attributes: { 'a' => 'b' })
counter.add(3, attributes: { 'b' => 'c' })
counter.add(4, attributes: { 'd' => 'e' })

sleep(1)

periodic_metric_reader.shutdown
snapshot = metric_exporter.metric_snapshots

_(snapshot.size).must_equal(1)
_(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false
end
end
end
Loading