From cc6c926b4281e35ce57d394dd4b4cf3c2463c2af Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Fri, 6 Dec 2024 18:07:04 -0500 Subject: [PATCH] refactor: separate metrics reader and metrics exporter --- .../aggregation/explicit_bucket_histogram.rb | 2 + .../sdk/metrics/aggregation/sum.rb | 2 + .../export/console_metric_pull_exporter.rb | 2 +- .../export/in_memory_metric_pull_exporter.rb | 6 +- .../sdk/metrics/export/metric_exporter.rb | 36 ++++++++++++ .../sdk/metrics/export/metric_reader.rb | 58 ++++++++++++++++--- .../sdk/metrics/state/metric_store.rb | 3 +- .../sdk/metrics/state/metric_stream.rb | 1 + 8 files changed, 96 insertions(+), 14 deletions(-) create mode 100644 metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_exporter.rb diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb index 0d357e1d4..842db30af 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb @@ -16,6 +16,8 @@ class ExplicitBucketHistogram attr_reader :aggregation_temporality + # attr_accessor :aggregation_temporality # approach 2 + # The default value for boundaries represents the following buckets: # (-inf, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], # (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb index c2771b38e..06e1924ec 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb @@ -13,6 +13,8 @@ module Aggregation class Sum attr_reader :aggregation_temporality + # attr_accessor :aggregation_temporality # approach 2 + def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta)) # TODO: the default should be :cumulative, see issue #1555 @aggregation_temporality = aggregation_temporality.to_sym diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb index 8264c8e2e..b8f0ba2fb 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/console_metric_pull_exporter.rb @@ -11,7 +11,7 @@ module Export # Outputs {MetricData} to the console # # Potentially useful for exploratory purposes. - class ConsoleMetricPullExporter < MetricReader + class ConsoleMetricPullExporter < MetricExporter def initialize super @stopped = false diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb index d0d8ccb90..e33f4f55f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb @@ -10,7 +10,7 @@ module Metrics module Export # The InMemoryMetricPullExporter behaves as a Metric Reader and Exporter. # To be used for testing purposes, not production. - class InMemoryMetricPullExporter < MetricReader + class InMemoryMetricPullExporter < MetricExporter attr_reader :metric_snapshots def initialize @@ -35,10 +35,6 @@ def reset @metric_snapshots.clear end end - - def shutdown - SUCCESS - end end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_exporter.rb new file mode 100644 index 000000000..d18be79cc --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_exporter.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Export + # Exporter provides a minimal example implementation. + # It is not required to subclass this class to provide an implementation + # of MetricReader, provided the interface is satisfied. + class MetricExporter + attr_reader :metric_store + + def initialize + @metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new + end + + def collect + @metric_store.collect + end + + def shutdown(timeout: nil) + Export::SUCCESS + end + + def force_flush(timeout: nil) + Export::SUCCESS + end + end + end + end + end +end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb index c261f4f10..75dd5fa5c 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb @@ -8,26 +8,70 @@ module OpenTelemetry module SDK module Metrics module Export - # MetricReader provides a minimal example implementation. - # It is not required to subclass this class to provide an implementation - # of MetricReader, provided the interface is satisfied. + # MetricReader class MetricReader - attr_reader :metric_store + attr_reader :exporters - def initialize - @metric_store = OpenTelemetry::SDK::Metrics::State::MetricStore.new + def initialize(exporter: nil) + register_exporter(exporter) + @mutex = Mutex.new + @exporters = [] end + # The metrics Reader implementation supports registering metric Exporters + def register_exporter(exporter: nil) + return unless exporter.respond_to?(:pull) + + @mutex.synchronize do + @exporters << exporter + end + end + + # Each exporter pull will trigger its metric_store call collect; + # and metric_store will collect all metrics data and send for export. def collect - @metric_store.collect + @exporters.each { |exporter| exporter.pull if exporter.respond_to?(:pull) } + end + alias pull collect + + # The metrics Reader implementation supports configuring the + # default aggregation on the basis of instrument kind. + def aggregator(aggregator: nil, instrument_kind: nil) + return if aggregator.nil? + + @exporters.each do |exporter| + exporter.metric_store.metric_streams.each do |ms| + ms.default_aggregation = aggregator if instrument_kind.nil? || ms.instrument_kind == instrument_kind + end + end + end + + # The metrics Reader implementation supports configuring the + # default temporality on the basis of instrument kind. + def temporality(temporality: nil, instrument_kind: nil) + return if temporality.nil? + + @exporters.each do |exporter| + exporter.metric_store.metric_streams.each do |ms| + ms.default_aggregation.aggregation_temporality = temporality if instrument_kind.nil? || ms.instrument_kind == instrument_kind + end + end end + # shutdown all exporters def shutdown(timeout: nil) + @exporters.each { |exporter| exporter.shutdown(timeout: timeout) if exporter.respond_to?(:shutdown) } Export::SUCCESS + rescue StandardError + Export::FAILURE end + # force flush all exporters def force_flush(timeout: nil) + @exporters.each { |exporter| exporter.force_flush(timeout: timeout) if exporter.respond_to?(:force_flush) } Export::SUCCESS + rescue StandardError + Export::FAILURE end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb index b89eb0916..6d7ac5bb5 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb @@ -13,6 +13,8 @@ module State # The MetricStore module provides SDK internal functionality that is not a part of the # public API. class MetricStore + attr_reader :metric_streams + def initialize @mutex = Mutex.new @epoch_start_time = now_in_nano @@ -23,7 +25,6 @@ def initialize def collect @mutex.synchronize do @epoch_end_time = now_in_nano - # snapshot = @metric_streams.map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) } snapshot = @metric_streams.flat_map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) } @epoch_start_time = @epoch_end_time snapshot diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb index 05033f522..d5aaf4741 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb @@ -14,6 +14,7 @@ module State # public API. class MetricStream attr_reader :name, :description, :unit, :instrument_kind, :instrumentation_scope, :data_points + attr_accessor :default_aggregation def initialize( name,