diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb index 64a038f819..782a75aae4 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb @@ -26,3 +26,4 @@ module Export require 'opentelemetry/sdk/metrics/export/metric_reader' require 'opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter' require 'opentelemetry/sdk/metrics/export/console_metric_pull_exporter' +require 'opentelemetry/sdk/metrics/export/periodic_metric_reader' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb index 35c2dfe575..8291ed9fe7 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb @@ -23,7 +23,7 @@ def pull export(collect) end - def export(metrics) + def export(metrics, timeout: nil) @mutex.synchronize do @metric_snapshots << metrics end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb new file mode 100644 index 0000000000..948f98976c --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Export + # PeriodicMetricReader provides a minimal example implementation. + class PeriodicMetricReader < MetricReader + # Returns a new instance of the {PeriodicMetricReader}. + # + # @param [Integer] export_interval_millis the maximum interval time. + # Defaults to the value of the OTEL_METRIC_EXPORT_INTERVAL environment + # variable, if set, or 60_000. + # @param [Integer] export_timeout_millis the maximum export timeout. + # Defaults to the value of the OTEL_METRIC_EXPORT_TIMEOUT environment + # variable, if set, or 30_000. + # @param [MetricReader] exporter the (duck type) MetricReader to where the + # recorded metrics are pushed after certain interval. + # + # @return a new instance of the {PeriodicMetricReader}. + def initialize(export_interval_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_INTERVAL', 60_000)), + export_timeout_millis: Float(ENV.fetch('OTEL_METRIC_EXPORT_TIMEOUT', 30_000)), + exporter: nil) + super() + + @export_interval = export_interval_millis / 1000.0 + @export_timeout = export_timeout_millis / 1000.0 + @exporter = exporter + @thread = nil + @continue = false + @mutex = Mutex.new + @export_mutex = Mutex.new + + start + end + + def shutdown(timeout: nil) + thread = lock do + @continue = false # force termination in next iteration + @thread + end + thread&.join(@export_interval) + @exporter.force_flush if @exporter.respond_to?(:force_flush) + @exporter.shutdown + Export::SUCCESS + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'Fail to shutdown PeriodicMetricReader.') + Export::FAILURE + end + + def force_flush(timeout: nil) + export(timeout: timeout) + Export::SUCCESS + rescue StandardError + Export::FAILURE + end + + private + + def start + @continue = true + if @exporter.nil? + OpenTelemetry.logger.warn 'Missing exporter in PeriodicMetricReader.' + elsif @thread&.alive? + OpenTelemetry.logger.warn 'PeriodicMetricReader is still running. Please shutdown it if it needs to restart.' + else + @thread = Thread.new do + while @continue + sleep(@export_interval) + begin + Timeout.timeout(@export_timeout) do + export(timeout: @export_timeout) + end + rescue Timeout::Error => e + OpenTelemetry.handle_error(exception: e, message: 'PeriodicMetricReader timeout.') + end + end + end + end + end + + def export(timeout: nil) + @export_mutex.synchronize do + collected_metrics = collect + @exporter.export(collected_metrics, timeout: timeout || @export_timeout) unless collected_metrics.empty? + end + end + + def lock(&block) + @mutex.synchronize(&block) + end + end + end + end + end +end diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb new file mode 100644 index 0000000000..7bf00a08fc --- /dev/null +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK do + describe '#periodic_metric_reader' do + before { reset_metrics_sdk } + + it 'emits 2 metrics after 10 seconds' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + sleep(8) + + periodic_metric_reader.shutdown + snapshot = metric_exporter.metric_snapshots + + _(snapshot.size).must_equal(2) + + first_snapshot = snapshot[0] + _(first_snapshot[0].name).must_equal('counter') + _(first_snapshot[0].unit).must_equal('smidgen') + _(first_snapshot[0].description).must_equal('a small amount of something') + + _(first_snapshot[0].instrumentation_scope.name).must_equal('test') + + _(first_snapshot[0].data_points[0].value).must_equal(1) + _(first_snapshot[0].data_points[0].attributes).must_equal({}) + + _(first_snapshot[0].data_points[1].value).must_equal(4) + _(first_snapshot[0].data_points[1].attributes).must_equal('a' => 'b') + + _(first_snapshot[0].data_points[2].value).must_equal(3) + _(first_snapshot[0].data_points[2].attributes).must_equal('b' => 'c') + + _(first_snapshot[0].data_points[3].value).must_equal(4) + _(first_snapshot[0].data_points[3].attributes).must_equal('d' => 'e') + + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + end + + it 'emits 1 metric after 1 second when interval is > 1 second' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + sleep(1) + + periodic_metric_reader.shutdown + snapshot = metric_exporter.metric_snapshots + + _(snapshot.size).must_equal(1) + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + end + end +end