diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 0237e4a769..0f6023f19b 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -15,10 +15,10 @@ use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use opentelemetry::logs::LogResult; use opentelemetry::{InstrumentationScope, KeyValue}; use opentelemetry_appender_tracing::layer as tracing_layer; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 6b07a8a72d..bbd897c8d1 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -210,12 +210,12 @@ const fn severity_of_level(level: &Level) -> Severity { mod tests { use crate::layer; use async_trait::async_trait; - use opentelemetry::logs::{LogResult, Severity}; + use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider as _; use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; use opentelemetry::{logs::AnyValue, Key}; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; - use opentelemetry_sdk::logs::{LogRecord, LoggerProvider}; + use opentelemetry_sdk::logs::{LogRecord, LogResult, LoggerProvider}; use opentelemetry_sdk::testing::logs::InMemoryLogExporter; use opentelemetry_sdk::trace; use opentelemetry_sdk::trace::{Sampler, TracerProvider}; @@ -334,7 +334,7 @@ mod tests { .expect("Atleast one log is expected to be present."); // Validate common fields - assert_eq!(log.instrumentation.name, "opentelemetry-appender-tracing"); + assert_eq!(log.instrumentation.name(), "opentelemetry-appender-tracing"); assert_eq!(log.record.severity_number, Some(Severity::Error)); // Validate trace context is none. @@ -428,7 +428,7 @@ mod tests { .expect("Atleast one log is expected to be present."); // validate common fields. - assert_eq!(log.instrumentation.name, "opentelemetry-appender-tracing"); + assert_eq!(log.instrumentation.name(), "opentelemetry-appender-tracing"); assert_eq!(log.record.severity_number, Some(Severity::Error)); // validate trace context. @@ -526,7 +526,7 @@ mod tests { .expect("Atleast one log is expected to be present."); // Validate common fields - assert_eq!(log.instrumentation.name, "opentelemetry-appender-tracing"); + assert_eq!(log.instrumentation.name(), "opentelemetry-appender-tracing"); assert_eq!(log.record.severity_number, Some(Severity::Error)); // Validate trace context is none. @@ -605,7 +605,7 @@ mod tests { .expect("Atleast one log is expected to be present."); // validate common fields. - assert_eq!(log.instrumentation.name, "opentelemetry-appender-tracing"); + assert_eq!(log.instrumentation.name(), "opentelemetry-appender-tracing"); assert_eq!(log.record.severity_number, Some(Severity::Error)); // validate trace context. diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index c67d6b21c5..10365606f0 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -2,7 +2,6 @@ use once_cell::sync::Lazy; use opentelemetry::{ global, - metrics::MetricError, trace::{TraceContextExt, TraceError, Tracer}, InstrumentationScope, KeyValue, }; @@ -11,7 +10,7 @@ use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter}; use opentelemetry_sdk::{ logs::LoggerProvider, - metrics::{PeriodicReader, SdkMeterProvider}, + metrics::{MetricError, PeriodicReader, SdkMeterProvider}, runtime, trace::{self as sdktrace, Config, TracerProvider}, }; @@ -31,7 +30,7 @@ static RESOURCE: Lazy = Lazy::new(|| { )]) }); -fn init_logs() -> Result { +fn init_logs() -> Result { let exporter = LogExporter::builder() .with_http() .with_endpoint("http://localhost:4318/v1/logs") diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 0f99248bfd..96242458a3 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -1,12 +1,12 @@ use once_cell::sync::Lazy; -use opentelemetry::logs::LogError; -use opentelemetry::metrics::MetricError; use opentelemetry::trace::{TraceContextExt, TraceError, Tracer}; use opentelemetry::KeyValue; use opentelemetry::{global, InstrumentationScope}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig}; +use opentelemetry_sdk::logs::LogError; use opentelemetry_sdk::logs::LoggerProvider; +use opentelemetry_sdk::metrics::MetricError; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::trace::Config; use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index db1932868b..985edc96ee 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; -use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index b047cd4f32..5ba84b30e6 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; -use opentelemetry::metrics::{MetricError, MetricResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use crate::{metric::MetricsClient, Error}; diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 154871887b..1b18686a3e 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -223,7 +223,7 @@ impl HttpExporterBuilder { /// Create a log exporter with the current configuration #[cfg(feature = "logs")] - pub fn build_log_exporter(mut self) -> opentelemetry::logs::LogResult { + pub fn build_log_exporter(mut self) -> opentelemetry_sdk::logs::LogResult { use crate::{ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, @@ -244,7 +244,7 @@ impl HttpExporterBuilder { pub fn build_metrics_exporter( mut self, temporality: opentelemetry_sdk::metrics::data::Temporality, - ) -> opentelemetry::metrics::MetricResult { + ) -> opentelemetry_sdk::metrics::MetricResult { use crate::{ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, @@ -315,7 +315,7 @@ impl OtlpHttpClient { fn build_logs_export_body( &self, logs: LogBatch<'_>, - ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { + ) -> opentelemetry_sdk::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; @@ -324,7 +324,7 @@ impl OtlpHttpClient { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { Ok(json) => Ok((json.into(), "application/json")), - Err(e) => Err(opentelemetry::logs::LogError::from(e.to_string())), + Err(e) => Err(opentelemetry_sdk::logs::LogError::from(e.to_string())), }, _ => Ok((req.encode_to_vec(), "application/x-protobuf")), } @@ -334,7 +334,7 @@ impl OtlpHttpClient { fn build_metrics_export_body( &self, metrics: &mut opentelemetry_sdk::metrics::data::ResourceMetrics, - ) -> opentelemetry::metrics::MetricResult<(Vec, &'static str)> { + ) -> opentelemetry_sdk::metrics::MetricResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; let req: ExportMetricsServiceRequest = (&*metrics).into(); @@ -343,7 +343,9 @@ impl OtlpHttpClient { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { Ok(json) => Ok((json.into(), "application/json")), - Err(e) => Err(opentelemetry::metrics::MetricError::Other(e.to_string())), + Err(e) => Err(opentelemetry_sdk::metrics::MetricError::Other( + e.to_string(), + )), }, _ => Ok((req.encode_to_vec(), "application/x-protobuf")), } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index bf9b6c9ed3..149c4b51f0 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; use core::fmt; -use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogError, LogResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index b8a59858eb..4159222cd5 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -2,11 +2,11 @@ use core::fmt; use std::sync::Mutex; use async_trait::async_trait; -use opentelemetry::metrics::{MetricError, MetricResult}; use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, }; use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 7089ca045a..e55d13f526 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -254,7 +254,7 @@ impl TonicExporterBuilder { #[cfg(feature = "logs")] pub(crate) fn build_log_exporter( self, - ) -> Result { + ) -> Result { use crate::exporter::tonic::logs::TonicLogsClient; let (channel, interceptor, compression) = self.build_channel( @@ -274,7 +274,7 @@ impl TonicExporterBuilder { pub(crate) fn build_metrics_exporter( self, temporality: opentelemetry_sdk::metrics::data::Temporality, - ) -> opentelemetry::metrics::MetricResult { + ) -> opentelemetry_sdk::metrics::MetricResult { use crate::MetricExporter; use metrics::TonicMetricsClient; diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 4429f6fda9..c8d7c0308a 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -391,6 +391,12 @@ impl ExportError for Error { } } +impl opentelemetry::trace::ExportError for Error { + fn exporter_name(&self) -> &'static str { + "otlp" + } +} + /// The communication protocol to use when exporting data. #[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] #[derive(Clone, Copy, Debug, Eq, PartialEq)] diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 42727a8d74..a0c09ab28d 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use std::fmt::Debug; -use opentelemetry::logs::LogResult; +use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::export::logs::LogBatch; @@ -62,14 +62,14 @@ impl LogExporterBuilder { #[cfg(feature = "grpc-tonic")] impl LogExporterBuilder { - pub fn build(self) -> Result { + pub fn build(self) -> Result { self.client.0.build_log_exporter() } } #[cfg(any(feature = "http-proto", feature = "http-json"))] impl LogExporterBuilder { - pub fn build(self) -> Result { + pub fn build(self) -> Result { self.client.0.build_log_exporter() } } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index e2b3408174..3d5f9b981f 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet; use async_trait::async_trait; use core::fmt; -use opentelemetry::metrics::MetricResult; +use opentelemetry_sdk::metrics::MetricResult; use opentelemetry_sdk::metrics::{ data::{ResourceMetrics, Temporality}, diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 4e437bb069..13068d0938 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -2,21 +2,17 @@ use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; use log::{info, Level}; -use opentelemetry::logs::LogError; use opentelemetry::KeyValue; use opentelemetry_appender_log::OpenTelemetryLogBridge; -use opentelemetry_otlp::{LogExporter, WithExportConfig}; -use opentelemetry_sdk::logs::LoggerProvider; +use opentelemetry_otlp::LogExporter; +use opentelemetry_sdk::logs::{LogError, LoggerProvider}; use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; use std::error::Error; use std::fs::File; use std::os::unix::fs::MetadataExt; fn init_logs() -> Result { - let exporter = LogExporter::builder() - .with_tonic() - .with_endpoint("0.0.0.0:4317") - .build()?; + let exporter = LogExporter::builder().with_tonic().build()?; Ok(LoggerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index 0821165c89..37efc27199 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -64,9 +64,9 @@ pub mod tonic { } } else { InstrumentationScope { - name: library.name.into_owned(), - version: library.version.map(Cow::into_owned).unwrap_or_default(), - attributes: Attributes::from(library.attributes).0, + name: library.name().to_owned(), + version: library.version().map(ToOwned::to_owned).unwrap_or_default(), + attributes: Attributes::from(library.attributes().cloned()).0, ..Default::default() } } @@ -95,13 +95,9 @@ pub mod tonic { } } else { InstrumentationScope { - name: library.name.to_string(), - version: library - .version - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(), - attributes: Attributes::from(library.attributes.clone()).0, + name: library.name().to_owned(), + version: library.version().map(ToOwned::to_owned).unwrap_or_default(), + attributes: Attributes::from(library.attributes().cloned()).0, ..Default::default() } } @@ -112,8 +108,8 @@ pub mod tonic { #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); - impl From> for Attributes { - fn from(kvs: Vec) -> Self { + impl> From for Attributes { + fn from(kvs: I) -> Self { Attributes( kvs.into_iter() .map(|api_kv| KeyValue { diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 52fd3cace4..a85b2ed7a7 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -167,9 +167,8 @@ pub mod tonic { schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { schema_url: instrumentation - .schema_url - .clone() - .map(Into::into) + .schema_url() + .map(ToOwned::to_owned) .unwrap_or_default(), scope: Some((instrumentation, log_record.target.clone()).into()), log_records: vec![log_record.into()], @@ -196,7 +195,7 @@ pub mod tonic { let key = log_record .target .clone() - .unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned())); + .unwrap_or_else(|| Cow::Owned(instrumentation.name().to_owned())); scope_map .entry(key) .or_default() diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index df3e105803..2e9b2c1402 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -137,9 +137,8 @@ pub mod tonic { metrics: sm.metrics.iter().map(Into::into).collect(), schema_url: sm .scope - .schema_url - .as_ref() - .map(ToString::to_string) + .schema_url() + .map(ToOwned::to_owned) .unwrap_or_default(), } } diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-proto/src/transform/trace.rs index 27018d624a..fc031c5e32 100644 --- a/opentelemetry-proto/src/transform/trace.rs +++ b/opentelemetry-proto/src/transform/trace.rs @@ -102,9 +102,8 @@ pub mod tonic { scope_spans: vec![ScopeSpans { schema_url: source_span .instrumentation_scope - .schema_url - .as_ref() - .map(ToString::to_string) + .schema_url() + .map(ToOwned::to_owned) .unwrap_or_default(), scope: Some((source_span.instrumentation_scope, None).into()), spans: vec![Span { diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 96967fe421..812fe80672 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -34,6 +34,18 @@ - Pin url version to `2.5.2`. The higher version breaks the build refer: [servo/rust-url#992.](https://github.com/servo/rust-url/issues/992) The `url` crate is used when `jaeger_remote_sampler` feature is enabled. +- **BREAKING**: [#2266](https://github.com/open-telemetry/opentelemetry-rust/pull/2266) + - Moved `ExportError` trait from `opentelemetry::ExportError` to `opentelemetry_sdk::export::ExportError` + - Moved `LogError` enum from `opentelemetry::logs::LogError` to `opentelemetry_sdk::logs::LogError` + - Moved `LogResult` type alias from `opentelemetry::logs::LogResult` to `opentelemetry_sdk::logs::LogResult` + - Renamed `opentelemetry::metrics::Result` type alias to `opentelemetry::metrics::MetricResult` + - Renamed `opentelemetry::metrics::MetricsError` enum to `opentelemetry::metrics::MetricError` + - Moved `MetricError` enum from `opentelemetry::metrics::MetricError` to `opentelemetry_sdk::metrics::MetricError` + - Moved `MetricResult` type alias from `opentelemetry::metrics::MetricResult` to `opentelemetry_sdk::metrics::MetricResult` + + - Users calling public APIs that return these constructs (e.g, LoggerProvider::shutdown(), MeterProvider::force_flush()) should now import them from the SDK instead of the API. + - Developers creating custom exporters should ensure they import these constructs from the SDK, not the API. + ## v0.26.0 Released 2024-Sep-30 diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index ca8a76a414..27403aa9d7 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -20,13 +20,11 @@ use std::time::SystemTime; use criterion::{criterion_group, criterion_main, Criterion}; -use opentelemetry::logs::{ - AnyValue, LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity, -}; +use opentelemetry::logs::{AnyValue, LogRecord as _, Logger as _, LoggerProvider as _, Severity}; use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::{InstrumentationScope, Key}; -use opentelemetry_sdk::logs::{LogProcessor, LogRecord, Logger, LoggerProvider}; +use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, Logger, LoggerProvider}; use opentelemetry_sdk::trace; use opentelemetry_sdk::trace::{Sampler, TracerProvider}; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index c3c027f3e1..c2ecb78ce9 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -16,7 +16,8 @@ use std::time::SystemTime; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; +use opentelemetry::logs::{LogRecord as _, Logger as _, LoggerProvider as _, Severity}; +use opentelemetry_sdk::logs::LogResult; use opentelemetry::InstrumentationScope; use opentelemetry_sdk::export::logs::LogBatch; diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index 03cd71549c..87cd242d0d 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -19,10 +19,10 @@ use std::{ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::{ - logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}, + logs::{LogRecord as _, Logger as _, LoggerProvider as _, Severity}, InstrumentationScope, }; -use opentelemetry_sdk::logs::{LogProcessor, LogRecord, Logger, LoggerProvider}; +use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, Logger, LoggerProvider}; // Run this benchmark with: // cargo bench --bench log_processor diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 0e36debb50..941bcdda14 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Weak}; use criterion::{criterion_group, criterion_main, Bencher, Criterion}; use opentelemetry::{ - metrics::{Counter, Histogram, MeterProvider as _, MetricResult}, + metrics::{Counter, Histogram, MeterProvider as _}, Key, KeyValue, }; use opentelemetry_sdk::{ @@ -11,8 +11,8 @@ use opentelemetry_sdk::{ data::{ResourceMetrics, Temporality}, new_view, reader::MetricReader, - Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, - View, + Aggregation, Instrument, InstrumentKind, ManualReader, MetricResult, Pipeline, + SdkMeterProvider, Stream, View, }, Resource, }; @@ -132,19 +132,13 @@ fn bench_counter(view: Option>, temporality: &str) -> (SharedReade fn counters(c: &mut Criterion) { let (_, cntr) = bench_counter(None, "cumulative"); - let (_, cntr2) = bench_counter(None, "delta"); - let (_, cntr3) = bench_counter(None, "cumulative"); + let (_, cntr_max) = bench_counter(None, "cumulative"); let mut group = c.benchmark_group("Counter"); group.bench_function("AddNoAttrs", |b| b.iter(|| cntr.add(1, &[]))); - group.bench_function("AddNoAttrsDelta", |b| b.iter(|| cntr2.add(1, &[]))); - group.bench_function("AddOneAttr", |b| { b.iter(|| cntr.add(1, &[KeyValue::new("K", "V")])) }); - group.bench_function("AddOneAttrDelta", |b| { - b.iter(|| cntr2.add(1, &[KeyValue::new("K1", "V1")])) - }); group.bench_function("AddThreeAttr", |b| { b.iter(|| { cntr.add( @@ -157,18 +151,6 @@ fn counters(c: &mut Criterion) { ) }) }); - group.bench_function("AddThreeAttrDelta", |b| { - b.iter(|| { - cntr2.add( - 1, - &[ - KeyValue::new("K2", "V2"), - KeyValue::new("K3", "V3"), - KeyValue::new("K4", "V4"), - ], - ) - }) - }); group.bench_function("AddFiveAttr", |b| { b.iter(|| { cntr.add( @@ -183,20 +165,6 @@ fn counters(c: &mut Criterion) { ) }) }); - group.bench_function("AddFiveAttrDelta", |b| { - b.iter(|| { - cntr2.add( - 1, - &[ - KeyValue::new("K5", "V5"), - KeyValue::new("K6", "V6"), - KeyValue::new("K7", "V7"), - KeyValue::new("K8", "V8"), - KeyValue::new("K9", "V9"), - ], - ) - }) - }); group.bench_function("AddTenAttr", |b| { b.iter(|| { cntr.add( @@ -216,25 +184,6 @@ fn counters(c: &mut Criterion) { ) }) }); - group.bench_function("AddTenAttrDelta", |b| { - b.iter(|| { - cntr2.add( - 1, - &[ - KeyValue::new("K10", "V10"), - KeyValue::new("K11", "V11"), - KeyValue::new("K12", "V12"), - KeyValue::new("K13", "V13"), - KeyValue::new("K14", "V14"), - KeyValue::new("K15", "V15"), - KeyValue::new("K16", "V16"), - KeyValue::new("K17", "V17"), - KeyValue::new("K18", "V18"), - KeyValue::new("K19", "V19"), - ], - ) - }) - }); const MAX_DATA_POINTS: i64 = 2000; let mut max_attributes: Vec = Vec::new(); @@ -244,14 +193,16 @@ fn counters(c: &mut Criterion) { } group.bench_function("AddOneTillMaxAttr", |b| { - b.iter(|| cntr3.add(1, &max_attributes)) + b.iter(|| cntr_max.add(1, &max_attributes)) }); for i in MAX_DATA_POINTS..MAX_DATA_POINTS * 2 { max_attributes.push(KeyValue::new(i.to_string(), i)) } - group.bench_function("AddMaxAttr", |b| b.iter(|| cntr3.add(1, &max_attributes))); + group.bench_function("AddMaxAttr", |b| { + b.iter(|| cntr_max.add(1, &max_attributes)) + }); group.bench_function("AddInvalidAttr", |b| { b.iter(|| cntr.add(1, &[KeyValue::new("", "V"), KeyValue::new("K", "V")])) diff --git a/opentelemetry/src/global/error_handler.rs b/opentelemetry-sdk/src/error.rs similarity index 88% rename from opentelemetry/src/global/error_handler.rs rename to opentelemetry-sdk/src/error.rs index 391fa25f63..115da17b78 100644 --- a/opentelemetry/src/global/error_handler.rs +++ b/opentelemetry-sdk/src/error.rs @@ -1,12 +1,13 @@ +//! Wrapper for error from trace, logs and metrics part of open telemetry. use std::sync::PoisonError; #[cfg(feature = "logs")] use crate::logs::LogError; #[cfg(feature = "metrics")] use crate::metrics::MetricError; -use crate::propagation::PropagationError; +use opentelemetry::propagation::PropagationError; #[cfg(feature = "trace")] -use crate::trace::TraceError; +use opentelemetry::trace::TraceError; /// Wrapper for error from both tracing and metrics part of open telemetry. #[derive(thiserror::Error, Debug)] diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 485dc49c09..db6b3b1e4d 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -1,11 +1,11 @@ //! Log exporters use crate::logs::LogRecord; +use crate::logs::{LogError, LogResult}; use crate::Resource; use async_trait::async_trait; use futures_util::future::BoxFuture; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; -use opentelemetry::logs::{LogError, LogResult}; use opentelemetry::InstrumentationScope; use std::fmt::Debug; diff --git a/opentelemetry-sdk/src/export/mod.rs b/opentelemetry-sdk/src/export/mod.rs index c59a7028a5..21dc2b570c 100644 --- a/opentelemetry-sdk/src/export/mod.rs +++ b/opentelemetry-sdk/src/export/mod.rs @@ -8,4 +8,8 @@ pub mod logs; #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub mod trace; -pub use opentelemetry::ExportError; +/// Trait for errors returned by exporters +pub trait ExportError: std::error::Error + Send + Sync + 'static { + /// The name of exporter that returned this error + fn exporter_name(&self) -> &'static str; +} diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 273cb183c8..6f351c2459 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -148,3 +148,5 @@ pub mod util; #[doc(inline)] pub use resource::Resource; + +pub mod error; diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs new file mode 100644 index 0000000000..1cbcd29e2e --- /dev/null +++ b/opentelemetry-sdk/src/logs/error.rs @@ -0,0 +1,63 @@ +use crate::export::ExportError; + +use std::{sync::PoisonError, time::Duration}; +use thiserror::Error; + +/// Describe the result of operations in log SDK. +pub type LogResult = Result; + +#[derive(Error, Debug)] +#[non_exhaustive] +/// Errors returned by the log SDK. +pub enum LogError { + /// Export failed with the error returned by the exporter. + #[error("Exporter {} encountered the following errors: {0}", .0.exporter_name())] + ExportFailed(Box), + + /// Export failed to finish after certain period and processor stopped the export. + #[error("Exporter timed out after {} seconds", .0.as_secs())] + ExportTimedOut(Duration), + + /// Processor is already shutdown + #[error("{0} already shutdown")] + AlreadyShutdown(String), + + /// Mutex lock poisoning + #[error("mutex lock poisioning for {0}")] + MutexPoisoned(String), + + /// Other errors propagated from log SDK that weren't covered above. + #[error(transparent)] + Other(#[from] Box), +} + +impl From for LogError +where + T: ExportError, +{ + fn from(err: T) -> Self { + LogError::ExportFailed(Box::new(err)) + } +} + +impl From for LogError { + fn from(err_msg: String) -> Self { + LogError::Other(Box::new(Custom(err_msg))) + } +} + +impl From<&'static str> for LogError { + fn from(err_msg: &'static str) -> Self { + LogError::Other(Box::new(Custom(err_msg.into()))) + } +} + +impl From> for LogError { + fn from(err: PoisonError) -> Self { + LogError::Other(err.to_string().into()) + } +} +/// Wrap type for string +#[derive(Error, Debug)] +#[error("{0}")] +struct Custom(String); diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 7bc9026d35..600e02c49e 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,11 +1,7 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; -use opentelemetry::{ - logs::{LogError, LogResult}, - otel_debug, - trace::TraceContextExt, - Context, InstrumentationScope, -}; +use crate::{logs::LogError, logs::LogResult}; +use opentelemetry::{otel_debug, trace::TraceContextExt, Context, InstrumentationScope}; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; @@ -296,7 +292,7 @@ impl opentelemetry::logs::Logger for Logger { || processor.event_enabled( level, target, - self.instrumentation_scope().name.as_ref(), + self.instrumentation_scope().name().as_ref(), ); } enabled diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 8d657fe04c..ee65ed3a74 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,6 @@ use crate::{ export::logs::{ExportResult, LogBatch, LogExporter}, - logs::LogRecord, + logs::{LogError, LogRecord, LogResult}, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -11,10 +11,7 @@ use futures_util::{ }; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; -use opentelemetry::{ - logs::{LogError, LogResult}, - otel_debug, otel_error, otel_warn, InstrumentationScope, -}; +use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; @@ -534,6 +531,7 @@ mod tests { }; use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; + use crate::logs::LogResult; use crate::testing::logs::InMemoryLogExporterBuilder; use crate::{ logs::{ @@ -551,7 +549,7 @@ mod tests { use opentelemetry::logs::AnyValue; use opentelemetry::logs::LogRecord as _; use opentelemetry::logs::{Logger, LoggerProvider as _}; - use opentelemetry::{logs::LogResult, KeyValue}; + use opentelemetry::KeyValue; use opentelemetry::{InstrumentationScope, Key}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index c4dd5ec44d..2dba806056 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -1,8 +1,10 @@ //! # OpenTelemetry Log SDK +mod error; mod log_emitter; mod log_processor; pub(crate) mod record; +pub use error::{LogError, LogResult}; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, @@ -87,7 +89,7 @@ mod tests { let log = exported_logs .first() .expect("Atleast one log is expected to be present."); - assert_eq!(log.instrumentation.name, "test-logger"); + assert_eq!(log.instrumentation.name(), "test-logger"); assert_eq!(log.record.severity_number, Some(Severity::Error)); assert_eq!(log.record.attributes_len(), 10); for i in 1..=10 { @@ -111,14 +113,13 @@ mod tests { let logger = provider.logger_with_scope(scope); let instrumentation_scope = logger.instrumentation_scope(); - let attributes = &instrumentation_scope.attributes; - assert_eq!(instrumentation_scope.name, "test_logger"); + assert_eq!(instrumentation_scope.name(), "test_logger"); assert_eq!( - instrumentation_scope.schema_url, - Some("https://opentelemetry.io/schema/1.0.0".into()) + instrumentation_scope.schema_url(), + Some("https://opentelemetry.io/schema/1.0.0") ); - assert_eq!(attributes.len(), 1); - assert_eq!(attributes[0].key, "test_k".into()); - assert_eq!(attributes[0].value, "test_v".into()); + assert!(instrumentation_scope + .attributes() + .eq(&[KeyValue::new("test_k", "test_v")])); } } diff --git a/opentelemetry-sdk/src/metrics/aggregation.rs b/opentelemetry-sdk/src/metrics/aggregation.rs index 6ccd564870..1788b6b264 100644 --- a/opentelemetry-sdk/src/metrics/aggregation.rs +++ b/opentelemetry-sdk/src/metrics/aggregation.rs @@ -1,7 +1,7 @@ use std::fmt; use crate::metrics::internal::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; -use opentelemetry::metrics::{MetricError, MetricResult}; +use crate::metrics::{MetricError, MetricResult}; /// The way recorded measurements are summarized. #[derive(Clone, Debug, PartialEq)] @@ -153,7 +153,7 @@ mod tests { internal::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}, Aggregation, }; - use opentelemetry::metrics::{MetricError, MetricResult}; + use crate::metrics::{MetricError, MetricResult}; #[test] fn validate_aggregation() { diff --git a/opentelemetry-sdk/src/metrics/error.rs b/opentelemetry-sdk/src/metrics/error.rs new file mode 100644 index 0000000000..13e0e7b752 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/error.rs @@ -0,0 +1,40 @@ +use std::result; +use std::sync::PoisonError; +use thiserror::Error; + +use crate::export::ExportError; + +/// A specialized `Result` type for metric operations. +pub type MetricResult = result::Result; + +/// Errors returned by the metrics API. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum MetricError { + /// Other errors not covered by specific cases. + #[error("Metrics error: {0}")] + Other(String), + /// Invalid configuration + #[error("Config error {0}")] + Config(String), + /// Fail to export metrics + #[error("Metrics exporter {} failed with {0}", .0.exporter_name())] + ExportErr(Box), + /// Invalid instrument configuration such invalid instrument name, invalid instrument description, invalid instrument unit, etc. + /// See [spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#general-characteristics) + /// for full list of requirements. + #[error("Invalid instrument configuration: {0}")] + InvalidInstrumentConfiguration(&'static str), +} + +impl From for MetricError { + fn from(err: T) -> Self { + MetricError::ExportErr(Box::new(err)) + } +} + +impl From> for MetricError { + fn from(err: PoisonError) -> Self { + MetricError::Other(err.to_string()) + } +} diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index f30685be76..a4e192c414 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,7 +1,7 @@ //! Interfaces for exporting metrics use async_trait::async_trait; -use opentelemetry::metrics::MetricResult; +use crate::metrics::MetricResult; use crate::metrics::data::ResourceMetrics; diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index c2b3ce5219..f73246a2ce 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -160,13 +160,11 @@ impl Instrument { } pub(crate) fn matches_scope(&self, other: &Instrument) -> bool { - (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref()) - && (self.scope.version.is_none() - || self.scope.version.as_ref().map(AsRef::as_ref) - == other.scope.version.as_ref().map(AsRef::as_ref)) - && (self.scope.schema_url.is_none() - || self.scope.schema_url.as_ref().map(AsRef::as_ref) - == other.scope.schema_url.as_ref().map(AsRef::as_ref)) + (self.scope.name().is_empty() || self.scope.name() == other.scope.name()) + && (self.scope.version().is_none() + || self.scope.version().as_ref() == other.scope.version().as_ref()) + && (self.scope.schema_url().is_none() + || self.scope.schema_url().as_ref() == other.scope.schema_url().as_ref()) } } diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 849684fcbd..4456d36645 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::HistogramDataPoint; @@ -37,6 +36,14 @@ where buckets: Mutex::new(Buckets::::new(*count)), } } + + fn clone_and_reset(&self, count: &usize) -> Self { + let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner()); + let cloned = replace(current.deref_mut(), Buckets::new(*count)); + Self { + buckets: Mutex::new(cloned), + } + } } #[derive(Default)] @@ -73,16 +80,6 @@ impl Buckets { self.max = value } } - - fn reset(&mut self) { - for item in &mut self.counts { - *item = 0; - } - self.count = Default::default(); - self.total = Default::default(); - self.min = T::max(); - self.max = T::min(); - } } /// Summarizes a set of measurements as a histogram with explicitly defined @@ -139,11 +136,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -155,24 +147,22 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + + self.value_map + .collect_and_reset(&mut h.data_points, |attributes, aggr| { + let b = aggr + .buckets + .into_inner() + .unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -193,54 +183,8 @@ impl Histogram { None }, exemplars: vec![], - }); - - b.reset(); - } - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -250,11 +194,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -266,24 +205,19 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let prev_start = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + self.value_map + .collect_readonly(&mut h.data_points, |attributes, aggr| { + let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -304,50 +238,8 @@ impl Histogram { None }, exemplars: vec![], - }); - } - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index aa6aef0bd3..05fedc1489 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; @@ -33,6 +29,12 @@ where fn update(&self, value: T) { self.value.store(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements as the last one made. @@ -56,102 +58,31 @@ impl LastValue { pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - dest.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); } pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec>) { let t = SystemTime::now(); let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - dest.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } } } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index d10ceab34f..8b6136d7ce 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,8 +6,9 @@ mod precomputed_sum; mod sum; use core::fmt; -use std::collections::HashMap; -use std::ops::{Add, AddAssign, Sub}; +use std::collections::{HashMap, HashSet}; +use std::mem::take; +use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -37,6 +38,9 @@ pub(crate) trait Aggregator { /// Called for each measurement. fn update(&self, value: Self::PreComputedValue); + + /// Return current value and reset this instance + fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } /// The storage for sums. @@ -130,6 +134,68 @@ where ); } } + + /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. + /// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared. + pub(crate) fn collect_readonly(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, &A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.load(Ordering::Acquire) { + dest.push(map_fn(vec![], &self.no_attribute_tracker)); + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.iter() { + if seen.insert(Arc::as_ptr(tracker)) { + dest.push(map_fn(attrs.clone(), tracker)); + } + } + } + + /// Iterate through all attribute sets, populate `DataPoints` and reset. + /// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection. + pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.swap(false, Ordering::AcqRel) { + dest.push(map_fn( + vec![], + self.no_attribute_tracker.clone_and_reset(&self.config), + )); + } + + let trackers = match self.trackers.write() { + Ok(mut trackers) => { + self.count.store(0, Ordering::SeqCst); + take(trackers.deref_mut()) + } + Err(_) => todo!(), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.into_iter() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + } + } + } +} + +/// Clear and allocate exactly required amount of space for all attribute-sets +fn prepare_data(data: &mut Vec, list_len: usize) { + data.clear(); + let total_len = list_len + 2; // to account for no_attributes case + overflow state + if total_len > data.capacity() { + data.reserve_exact(total_len - data.capacity()); + } } /// Marks a type that can have a value added and retrieved atomically. Required since diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 977c7d4a88..7bd843f4cd 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,11 +3,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; -use std::{ - collections::{HashMap, HashSet}, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{collections::HashMap, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { @@ -37,7 +33,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -50,68 +45,34 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), }; - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.value.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.value.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), + let mut new_reported = HashMap::with_capacity(reported.len()); + + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| { + let value = aggr.value.get_value(); + new_reported.insert(attributes.clone(), value); + let delta = value - *reported.get(&attributes).unwrap_or(&T::default()); + DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), value: delta, exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + } + }); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -127,7 +88,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -140,50 +100,19 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 34044020b8..40b95a5e60 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::vec; use std::{sync::Mutex, time::SystemTime}; @@ -33,6 +32,12 @@ where fn update(&self, value: T) { self.value.add(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements made as their arithmetic sum. @@ -80,59 +85,20 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -159,54 +125,17 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index f64c61b91d..21bd2a3976 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -3,10 +3,9 @@ use std::{ sync::{Mutex, Weak}, }; -use opentelemetry::{ - metrics::{MetricError, MetricResult}, - otel_debug, -}; +use opentelemetry::otel_debug; + +use crate::metrics::{MetricError, MetricResult}; use super::{ data::{ResourceMetrics, Temporality}, diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index c552109362..3c3ad37352 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -4,8 +4,8 @@ use std::{borrow::Cow, sync::Arc}; use opentelemetry::{ metrics::{ AsyncInstrumentBuilder, Counter, Gauge, Histogram, HistogramBuilder, InstrumentBuilder, - InstrumentProvider, MetricError, MetricResult, ObservableCounter, ObservableGauge, - ObservableUpDownCounter, UpDownCounter, + InstrumentProvider, ObservableCounter, ObservableGauge, ObservableUpDownCounter, + UpDownCounter, }, otel_error, InstrumentationScope, }; @@ -14,6 +14,7 @@ use crate::metrics::{ instrument::{Instrument, InstrumentKind, Observable, ResolvedMeasures}, internal::{self, Number}, pipeline::{Pipelines, Resolver}, + MetricError, MetricResult, }; use super::noop::NoopSyncInstrument; @@ -76,7 +77,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Counter will be ignored.", reason = format!("{}", err) @@ -98,7 +99,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Counter will be ignored.", reason = format!("{}", err) @@ -120,7 +121,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableCounter will not be invoked.", reason = format!("{}", err)); @@ -138,7 +139,7 @@ impl SdkMeter { if ms.is_empty() { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableCounter will not be invoked. Check View Configuration." ); @@ -158,7 +159,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableCounter will not be invoked.", reason = format!("{}", err)); @@ -179,7 +180,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableUpDownCounter will not be invoked.", reason = format!("{}", err)); @@ -197,7 +198,7 @@ impl SdkMeter { if ms.is_empty() { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableUpDownCounter will not be invoked. Check View Configuration." ); @@ -217,7 +218,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableUpDownCounter will not be invoked.", reason = format!("{}", err)); @@ -238,7 +239,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableGauge will not be invoked.", reason = format!("{}", err)); @@ -256,7 +257,7 @@ impl SdkMeter { if ms.is_empty() { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableGauge will not be invoked. Check View Configuration." ); @@ -276,7 +277,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Callbacks for this ObservableGauge will not be invoked.", reason = format!("{}", err)); @@ -297,7 +298,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this UpDownCounter will be ignored.", reason = format!("{}", err) @@ -319,7 +320,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this UpDownCounter will be ignored.", reason = format!("{}", err) @@ -341,7 +342,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Gauge will be ignored.", reason = format!("{}", err) @@ -363,7 +364,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Gauge will be ignored.", reason = format!("{}", err) @@ -385,7 +386,7 @@ impl SdkMeter { if let Err(err) = validation_result { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Histogram will be ignored.", reason = format!("{}", err) @@ -407,7 +408,7 @@ impl SdkMeter { Err(err) => { otel_error!( name: "InstrumentCreationFailed", - meter_name = self.scope.name.as_ref(), + meter_name = self.scope.name(), instrument_name = builder.name.as_ref(), message = "Measurements from this Histogram will be ignored.", reason = format!("{}", err) @@ -633,7 +634,7 @@ where mod tests { use std::borrow::Cow; - use opentelemetry::metrics::MetricError; + use crate::metrics::MetricError; use super::{ validate_instrument_name, validate_instrument_unit, INSTRUMENT_NAME_FIRST_ALPHABETIC, diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 4c53e8d12e..2e9895db1e 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -8,10 +8,11 @@ use std::{ }; use opentelemetry::{ - metrics::{Meter, MeterProvider, MetricError, MetricResult}, + metrics::{Meter, MeterProvider}, otel_debug, otel_error, InstrumentationScope, }; +use crate::metrics::{MetricError, MetricResult}; use crate::Resource; use super::{ @@ -465,18 +466,16 @@ mod tests { assert_eq!(provider.inner.meters.lock().unwrap().len(), 2); // these are different meters because meter names are case sensitive - let mut library = InstrumentationScope::builder("ABC") - .with_version("1.0.0") - .with_schema_url("http://example.com") - .build(); - - let _meter6 = provider.meter_with_scope(library.clone()); - - library.name = "Abc".into(); - let _meter7 = provider.meter_with_scope(library.clone()); + let make_scope = |name| { + InstrumentationScope::builder(name) + .with_version("1.0.0") + .with_schema_url("http://example.com") + .build() + }; - library.name = "abc".into(); - let _meter8 = provider.meter_with_scope(library); + let _meter6 = provider.meter_with_scope(make_scope("ABC")); + let _meter7 = provider.meter_with_scope(make_scope("Abc")); + let _meter8 = provider.meter_with_scope(make_scope("abc")); assert_eq!(provider.inner.meters.lock().unwrap().len(), 5); } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 0e7d46acad..f8c440050d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -41,6 +41,7 @@ pub(crate) mod aggregation; pub mod data; +mod error; pub mod exporter; pub(crate) mod instrument; pub(crate) mod internal; @@ -54,6 +55,7 @@ pub mod reader; pub(crate) mod view; pub use aggregation::*; +pub use error::{MetricError, MetricResult}; pub use instrument::*; pub use manual_reader::*; pub use meter_provider::*; @@ -137,7 +139,6 @@ mod tests { use opentelemetry::InstrumentationScope; use opentelemetry::{metrics::MeterProvider as _, KeyValue}; use rand::{rngs, Rng, SeedableRng}; - use std::borrow::Cow; use std::cmp::{max, min}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -638,16 +639,18 @@ mod tests { // Act // Meters are identical except for scope attributes, but scope attributes are not an identifying property. // Hence there should be a single metric stream output for this test. - let mut scope = InstrumentationScope::builder("test.meter") - .with_version("v0.1.0") - .with_schema_url("http://example.com") - .with_attributes(vec![KeyValue::new("key", "value1")]) - .build(); - - let meter1 = meter_provider.meter_with_scope(scope.clone()); + let make_scope = |attributes| { + InstrumentationScope::builder("test.meter") + .with_version("v0.1.0") + .with_schema_url("http://example.com") + .with_attributes(attributes) + .build() + }; - scope.attributes = vec![KeyValue::new("key", "value2")]; - let meter2 = meter_provider.meter_with_scope(scope); + let meter1 = + meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")])); + let meter2 = + meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value2")])); let counter1 = meter1 .u64_counter("my_counter") @@ -682,13 +685,13 @@ mod tests { ); let scope = &resource_metrics[0].scope_metrics[0].scope; - assert_eq!(scope.name, "test.meter"); - assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0"))); - assert_eq!(scope.schema_url, Some(Cow::Borrowed("http://example.com"))); + assert_eq!(scope.name(), "test.meter"); + assert_eq!(scope.version(), Some("v0.1.0")); + assert_eq!(scope.schema_url(), Some("http://example.com")); // This is validating current behavior, but it is not guaranteed to be the case in the future, // as this is a user error and SDK reserves right to change this behavior. - assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]); + assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")])); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; assert_eq!(metric.name, "my_counter"); @@ -2341,7 +2344,7 @@ mod tests { ) -> Option<&'a ScopeMetrics> { metrics .iter() - .find(|&scope_metric| scope_metric.scope.name == name) + .find(|&scope_metric| scope_metric.scope.name() == name) } struct TestContext { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 1cab53edd1..6484d25126 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -11,14 +11,11 @@ use futures_util::{ stream::{self, FusedStream}, StreamExt, }; -use opentelemetry::{ - metrics::{MetricError, MetricResult}, - otel_debug, otel_error, -}; +use opentelemetry::{otel_debug, otel_error}; use crate::runtime::Runtime; use crate::{ - metrics::{exporter::PushMetricExporter, reader::SdkProducer}, + metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; @@ -407,11 +404,12 @@ impl MetricReader for PeriodicReader { #[cfg(all(test, feature = "testing"))] mod tests { use super::PeriodicReader; + use crate::metrics::MetricError; use crate::{ metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider, runtime, testing::metrics::InMemoryMetricExporter, Resource, }; - use opentelemetry::metrics::{MeterProvider, MetricError}; + use opentelemetry::metrics::MeterProvider; use std::sync::mpsc; #[test] diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index acf4605d13..8da41f38fb 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -5,10 +5,7 @@ use std::{ sync::{Arc, Mutex}, }; -use opentelemetry::{ - metrics::{MetricError, MetricResult}, - otel_debug, InstrumentationScope, KeyValue, -}; +use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; use crate::{ metrics::{ @@ -20,6 +17,7 @@ use crate::{ internal::Number, reader::{MetricReader, SdkProducer}, view::View, + MetricError, MetricResult, }, Resource, }; diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 1a98ffbb03..b700c13230 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,7 +1,7 @@ //! Interfaces for reading and producing metrics use std::{fmt, sync::Weak}; -use opentelemetry::metrics::MetricResult; +use crate::metrics::MetricResult; use super::{ data::{ResourceMetrics, Temporality}, diff --git a/opentelemetry-sdk/src/metrics/view.rs b/opentelemetry-sdk/src/metrics/view.rs index 3913412cb7..b4c4a8a4d5 100644 --- a/opentelemetry-sdk/src/metrics/view.rs +++ b/opentelemetry-sdk/src/metrics/view.rs @@ -1,6 +1,6 @@ use super::instrument::{Instrument, Stream}; +use crate::metrics::{MetricError, MetricResult}; use glob::Pattern; -use opentelemetry::metrics::{MetricError, MetricResult}; fn empty_view(_inst: &Instrument) -> Option { None diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 7bcbd996af..cac026c663 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,8 +1,8 @@ use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; +use crate::logs::{LogError, LogResult}; use crate::Resource; use async_trait::async_trait; -use opentelemetry::logs::{LogError, LogResult}; use opentelemetry::InstrumentationScope; use std::borrow::Cow; use std::sync::{Arc, Mutex}; diff --git a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs index d5c8a7494c..223c84fe70 100644 --- a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs @@ -1,9 +1,9 @@ use crate::metrics::data; use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics, Temporality}; use crate::metrics::exporter::PushMetricExporter; +use crate::metrics::MetricError; +use crate::metrics::MetricResult; use async_trait::async_trait; -use opentelemetry::metrics::MetricError; -use opentelemetry::metrics::MetricResult; use std::collections::VecDeque; use std::fmt; use std::sync::{Arc, Mutex}; diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index a3c3cf6f49..96c3c939ba 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,12 +1,12 @@ use std::sync::{Arc, Mutex, Weak}; +use crate::metrics::MetricResult; use crate::metrics::{ data::{ResourceMetrics, Temporality}, pipeline::Pipeline, reader::MetricReader, InstrumentKind, }; -use opentelemetry::metrics::MetricResult; #[derive(Debug, Clone)] pub struct TestMetricReader { diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index f8a0776318..e9996e3fc8 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -1,8 +1,5 @@ use crate::{ - export::{ - trace::{ExportResult, SpanData, SpanExporter}, - ExportError, - }, + export::trace::{ExportResult, SpanData, SpanExporter}, trace::{SpanEvents, SpanLinks}, }; use futures_util::future::BoxFuture; @@ -80,7 +77,7 @@ pub struct TestExportError(String); impl std::error::Error for TestExportError {} -impl ExportError for TestExportError { +impl opentelemetry::trace::ExportError for TestExportError { fn exporter_name(&self) -> &'static str { "test" } diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 7ce5e7b977..e8dd8922d9 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -83,7 +83,7 @@ mod tests { assert_eq!(exported_spans.len(), 1); let span = &exported_spans[0]; assert_eq!(span.name, "span_name_updated"); - assert_eq!(span.instrumentation_scope.name, "test_tracer"); + assert_eq!(span.instrumentation_scope.name(), "test_tracer"); assert_eq!(span.attributes.len(), 1); assert_eq!(span.events.len(), 1); assert_eq!(span.events[0].name, "test-event"); @@ -118,7 +118,7 @@ mod tests { assert_eq!(exported_spans.len(), 1); let span = &exported_spans[0]; assert_eq!(span.name, "span_name"); - assert_eq!(span.instrumentation_scope.name, "test_tracer"); + assert_eq!(span.instrumentation_scope.name(), "test_tracer"); assert_eq!(span.attributes.len(), 1); assert_eq!(span.events.len(), 1); assert_eq!(span.events[0].name, "test-event"); @@ -155,7 +155,7 @@ mod tests { let span = &exported_spans[0]; assert_eq!(span.name, "span_name"); assert_eq!(span.span_kind, SpanKind::Server); - assert_eq!(span.instrumentation_scope.name, "test_tracer"); + assert_eq!(span.instrumentation_scope.name(), "test_tracer"); assert_eq!(span.attributes.len(), 1); assert_eq!(span.events.len(), 1); assert_eq!(span.events[0].name, "test-event"); @@ -333,9 +333,8 @@ mod tests { let tracer = provider.tracer_with_scope(scope); let instrumentation_scope = tracer.instrumentation_scope(); - let attributes = &instrumentation_scope.attributes; - assert_eq!(attributes.len(), 1); - assert_eq!(attributes[0].key, "test_k".into()); - assert_eq!(attributes[0].value, "test_v".into()); + assert!(instrumentation_scope + .attributes() + .eq(&[KeyValue::new("test_k", "test_v")])); } } diff --git a/opentelemetry-sdk/src/trace/tracer.rs b/opentelemetry-sdk/src/trace/tracer.rs index 2004fdf2cd..cf2a58583d 100644 --- a/opentelemetry-sdk/src/trace/tracer.rs +++ b/opentelemetry-sdk/src/trace/tracer.rs @@ -30,8 +30,8 @@ impl fmt::Debug for Tracer { /// Omitting `provider` here is necessary to avoid cycles. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Tracer") - .field("name", &self.scope.name) - .field("version", &self.scope.version) + .field("name", &self.scope.name()) + .field("version", &self.scope.version()) .finish() } } diff --git a/opentelemetry-stdout/src/common.rs b/opentelemetry-stdout/src/common.rs index 83685d3f03..4da706f893 100644 --- a/opentelemetry-stdout/src/common.rs +++ b/opentelemetry-stdout/src/common.rs @@ -236,9 +236,9 @@ pub(crate) struct Scope { impl From for Scope { fn from(value: opentelemetry::InstrumentationScope) -> Self { Scope { - name: value.name, - version: value.version, - attributes: value.attributes.into_iter().map(Into::into).collect(), + name: value.name().to_owned().into(), + version: value.version().map(ToOwned::to_owned).map(Into::into), + attributes: value.attributes().map(Into::into).collect(), dropped_attributes_count: 0, } } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 8a2243df0c..c5edb880bf 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; -use opentelemetry::logs::LogResult; use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index b2b078a51b..d9e191b8aa 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; -use opentelemetry::metrics::{MetricError, MetricResult}; use opentelemetry_sdk::metrics::{ data::{self, ScopeMetrics, Temporality}, exporter::PushMetricExporter, }; +use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use std::fmt::Debug; use std::sync::atomic; @@ -72,17 +72,16 @@ impl PushMetricExporter for MetricExporter { fn print_metrics(metrics: &[ScopeMetrics]) { for (i, metric) in metrics.iter().enumerate() { println!("\tInstrumentation Scope #{}", i); - println!("\t\tName : {}", &metric.scope.name); - if let Some(version) = &metric.scope.version { + println!("\t\tName : {}", &metric.scope.name()); + if let Some(version) = &metric.scope.version() { println!("\t\tVersion : {:?}", version); } - if let Some(schema_url) = &metric.scope.schema_url { + if let Some(schema_url) = &metric.scope.schema_url() { println!("\t\tSchemaUrl: {:?}", schema_url); } metric .scope - .attributes - .iter() + .attributes() .enumerate() .for_each(|(index, kv)| { if index == 0 { diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 798e6b4247..4435e75d32 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -72,16 +72,18 @@ fn print_spans(batch: Vec) { for (i, span) in batch.into_iter().enumerate() { println!("Span #{}", i); println!("\tInstrumentation Scope"); - println!("\t\tName : {:?}", &span.instrumentation_scope.name); - if let Some(version) = &span.instrumentation_scope.version { + println!( + "\t\tName : {:?}", + &span.instrumentation_scope.name() + ); + if let Some(version) = &span.instrumentation_scope.version() { println!("\t\tVersion : {:?}", version); } - if let Some(schema_url) = &span.instrumentation_scope.schema_url { + if let Some(schema_url) = &span.instrumentation_scope.schema_url() { println!("\t\tSchemaUrl: {:?}", schema_url); } span.instrumentation_scope - .attributes - .iter() + .attributes() .enumerate() .for_each(|(index, kv)| { if index == 0 { diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index e7466c6a2d..1ddd6c2b5b 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -252,3 +252,9 @@ impl ExportError for Error { "zipkin" } } + +impl opentelemetry::trace::ExportError for Error { + fn exporter_name(&self) -> &'static str { + "zipkin" + } +} diff --git a/opentelemetry-zipkin/src/exporter/model/mod.rs b/opentelemetry-zipkin/src/exporter/model/mod.rs index 15646429c9..a78708a2ae 100644 --- a/opentelemetry-zipkin/src/exporter/model/mod.rs +++ b/opentelemetry-zipkin/src/exporter/model/mod.rs @@ -46,11 +46,14 @@ pub(crate) fn into_zipkin_span(local_endpoint: Endpoint, span_data: SpanData) -> [ ( INSTRUMENTATION_LIBRARY_NAME, - Some(span_data.instrumentation_scope.name), + Some(span_data.instrumentation_scope.name().to_owned()), ), ( INSTRUMENTATION_LIBRARY_VERSION, - span_data.instrumentation_scope.version, + span_data + .instrumentation_scope + .version() + .map(ToOwned::to_owned), ), ] .into_iter() diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 2b181cc8ad..878a10d80f 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -38,6 +38,17 @@ let counter = meter.u64_counter("my_counter").build(); - `global::handle_error` usage inside the opentelemetry crates has been replaced with `global::otel_info`, `otel_warn`, `otel_debug` and `otel_error` macros based on the severity of the internal logs. - The default behavior of `global::handle_error` was to log the error using `eprintln!`. With otel macro, the internal logs get emitted via `tracing` macros of matching severity. Users now need to configure the `tracing` layer to capture these logs. - Refer to this PR description for migration guide. Also refer to [self-diagnostics](https://github.com/open-telemetry/opentelemetry-rust/tree/main/examples/self-diagnostics) example on how to configure the tracing layer for internal logs. +- **Breaking change**: replaced `InstrumentationScope` public attributes by getters [#2275](https://github.com/open-telemetry/opentelemetry-rust/pull/2275) + + +- [#2266](https://github.com/open-telemetry/opentelemetry-rust/pull/2266) + - Moved `ExportError` trait from `opentelemetry::ExportError` to `opentelemetry_sdk::export::ExportError` + - Created new trait `opentelemetry::trace::ExportError` for trace API. This would be eventually be consolidated with ExportError in the SDK. + - Moved `LogError` enum from `opentelemetry::logs::LogError` to `opentelemetry_sdk::logs::LogError` + - Moved `LogResult` type alias from `opentelemetry::logs::LogResult` to `opentelemetry_sdk::logs::LogResult` + - Moved `MetricError` enum from `opentelemetry::metrics::MetricError` to `opentelemetry_sdk::metrics::MetricError` + - Moved `MetricResult` type alias from `opentelemetry::metrics::MetricResult` to `opentelemetry_sdk::metrics::MetricResult` + These changes shouldn't directly affect the users of OpenTelemetry crate, as these constructs are used in SDK and Exporters. If you are an author of an sdk component/plug-in, like an exporter etc. please use these types from sdk. Refer [CHANGELOG.md](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/CHANGELOG.md) for more details, under same version section. ## v0.26.0 Released 2024-Sep-30 diff --git a/opentelemetry/src/common.rs b/opentelemetry/src/common.rs index f152826b23..85fa6e7f9d 100644 --- a/opentelemetry/src/common.rs +++ b/opentelemetry/src/common.rs @@ -399,12 +399,6 @@ impl KeyValue { } } -/// Marker trait for errors returned by exporters -pub trait ExportError: std::error::Error + Send + Sync + 'static { - /// The name of exporter that returned this error - fn exporter_name(&self) -> &'static str; -} - /// Information about a library or crate providing instrumentation. /// /// An instrumentation scope should be named to follow any naming conventions @@ -419,18 +413,18 @@ pub struct InstrumentationScope { /// The library name. /// /// This should be the name of the crate providing the instrumentation. - pub name: Cow<'static, str>, + name: Cow<'static, str>, /// The library version. - pub version: Option>, + version: Option>, /// [Schema URL] used by this library. /// /// [Schema URL]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url - pub schema_url: Option>, + schema_url: Option>, /// Specifies the instrumentation scope attributes to associate with emitted telemetry. - pub attributes: Vec, + attributes: Vec, } // Uniqueness for InstrumentationScope does not depend on attributes @@ -462,6 +456,32 @@ impl InstrumentationScope { attributes: None, } } + + /// Returns the instrumentation library name. + #[inline] + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the instrumentation library version. + #[inline] + pub fn version(&self) -> Option<&str> { + self.version.as_deref() + } + + /// Returns the [Schema URL] used by this library. + /// + /// [Schema URL]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url + #[inline] + pub fn schema_url(&self) -> Option<&str> { + self.schema_url.as_deref() + } + + /// Returns the instrumentation scope attributes to associate with emitted telemetry. + #[inline] + pub fn attributes(&self) -> impl Iterator { + self.attributes.iter() + } } /// Configuration options for [InstrumentationScope]. @@ -478,11 +498,8 @@ impl InstrumentationScope { #[derive(Debug)] pub struct InstrumentationScopeBuilder { name: Cow<'static, str>, - version: Option>, - schema_url: Option>, - attributes: Option>, } diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 4391485619..e103eea867 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -16,12 +16,17 @@ /// use opentelemetry::otel_info; /// otel_info!(name: "sdk_start", version = "1.0.0", schema_url = "http://example.com"); /// ``` +/// + +// TODO: Remove `name` attribute duplication in logging macros below once `tracing::Fmt` supports displaying `name`. +// See issue: https://github.com/tokio-rs/tracing/issues/2774 + #[macro_export] macro_rules! otel_info { (name: $name:expr $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::info!( name: $name, target: env!("CARGO_PKG_NAME"), ""); + tracing::info!( name: $name, target: env!("CARGO_PKG_NAME"), name = $name, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -31,7 +36,7 @@ macro_rules! otel_info { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::info!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::info!(name: $name, target: env!("CARGO_PKG_NAME"), name = $name, $($key = $value),+, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -56,7 +61,7 @@ macro_rules! otel_warn { (name: $name:expr $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), ""); + tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), name = $name, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -68,6 +73,7 @@ macro_rules! otel_warn { { tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), + name = $name, $($key = { $value }),+, @@ -97,7 +103,7 @@ macro_rules! otel_debug { (name: $name:expr $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::debug!(name: $name, target: env!("CARGO_PKG_NAME"),""); + tracing::debug!(name: $name, target: env!("CARGO_PKG_NAME"), name = $name, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -107,7 +113,7 @@ macro_rules! otel_debug { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::debug!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::debug!(name: $name, target: env!("CARGO_PKG_NAME"), name = $name, $($key = $value),+, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -132,7 +138,7 @@ macro_rules! otel_error { (name: $name:expr $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), ""); + tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), name = $name, ""); } #[cfg(not(feature = "internal-logs"))] { @@ -144,6 +150,7 @@ macro_rules! otel_error { { tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), + name = $name, $($key = { $value }),+, diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 5f27b0e08b..182364a18c 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -130,7 +130,6 @@ //! [`MeterProvider`]: crate::metrics::MeterProvider //! [`set_meter_provider`]: crate::global::set_meter_provider -mod error_handler; mod internal_logging; #[cfg(feature = "metrics")] mod metrics; @@ -138,7 +137,6 @@ mod metrics; mod propagation; #[cfg(feature = "trace")] mod trace; -pub use error_handler::Error; #[cfg(feature = "metrics")] #[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] diff --git a/opentelemetry/src/lib.rs b/opentelemetry/src/lib.rs index 1c8d07caf7..f760aa1017 100644 --- a/opentelemetry/src/lib.rs +++ b/opentelemetry/src/lib.rs @@ -204,8 +204,7 @@ mod common; pub mod testing; pub use common::{ - Array, ExportError, InstrumentationScope, InstrumentationScopeBuilder, Key, KeyValue, - StringValue, Value, + Array, InstrumentationScope, InstrumentationScopeBuilder, Key, KeyValue, StringValue, Value, }; #[cfg(feature = "metrics")] diff --git a/opentelemetry/src/logs/mod.rs b/opentelemetry/src/logs/mod.rs index 1a27edb2e0..e57684bc0f 100644 --- a/opentelemetry/src/logs/mod.rs +++ b/opentelemetry/src/logs/mod.rs @@ -2,11 +2,6 @@ /// This API is not intended to be called by application developers directly. /// It is provided for logging library authors to build log appenders, that /// bridges existing logging systems with OpenTelemetry. -use crate::ExportError; - -use std::{sync::PoisonError, time::Duration}; -use thiserror::Error; - mod logger; mod noop; mod record; @@ -14,62 +9,3 @@ mod record; pub use logger::{Logger, LoggerProvider}; pub use noop::NoopLoggerProvider; pub use record::{AnyValue, LogRecord, Severity}; - -/// Describe the result of operations in log SDK. -pub type LogResult = Result; - -#[derive(Error, Debug)] -#[non_exhaustive] -/// Errors returned by the log SDK. -pub enum LogError { - /// Export failed with the error returned by the exporter. - #[error("Exporter {} encountered the following errors: {0}", .0.exporter_name())] - ExportFailed(Box), - - /// Export failed to finish after certain period and processor stopped the export. - #[error("Exporter timed out after {} seconds", .0.as_secs())] - ExportTimedOut(Duration), - - /// Processor is already shutdown - #[error("{0} already shutdown")] - AlreadyShutdown(String), - - /// Mutex lock poisoning - #[error("mutex lock poisioning for {0}")] - MutexPoisoned(String), - - /// Other errors propagated from log SDK that weren't covered above. - #[error(transparent)] - Other(#[from] Box), -} - -impl From for LogError -where - T: ExportError, -{ - fn from(err: T) -> Self { - LogError::ExportFailed(Box::new(err)) - } -} - -impl From for LogError { - fn from(err_msg: String) -> Self { - LogError::Other(Box::new(Custom(err_msg))) - } -} - -impl From<&'static str> for LogError { - fn from(err_msg: &'static str) -> Self { - LogError::Other(Box::new(Custom(err_msg.into()))) - } -} - -impl From> for LogError { - fn from(err: PoisonError) -> Self { - LogError::Other(err.to_string().into()) - } -} -/// Wrap type for string -#[derive(Error, Debug)] -#[error("{0}")] -struct Custom(String); diff --git a/opentelemetry/src/metrics/mod.rs b/opentelemetry/src/metrics/mod.rs index 6c3a4381fb..075d5cccbe 100644 --- a/opentelemetry/src/metrics/mod.rs +++ b/opentelemetry/src/metrics/mod.rs @@ -1,16 +1,13 @@ //! # OpenTelemetry Metrics API use std::hash::{Hash, Hasher}; -use std::result; use std::sync::Arc; -use std::sync::PoisonError; -use thiserror::Error; mod instruments; mod meter; pub(crate) mod noop; -use crate::{Array, ExportError, KeyValue, Value}; +use crate::{Array, KeyValue, Value}; pub use instruments::{ counter::{Counter, ObservableCounter}, gauge::{Gauge, ObservableGauge}, @@ -21,41 +18,6 @@ pub use instruments::{ }; pub use meter::{Meter, MeterProvider}; -/// A specialized `Result` type for metric operations. -pub type MetricResult = result::Result; - -/// Errors returned by the metrics API. -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum MetricError { - /// Other errors not covered by specific cases. - #[error("Metrics error: {0}")] - Other(String), - /// Invalid configuration - #[error("Config error {0}")] - Config(String), - /// Fail to export metrics - #[error("Metrics exporter {} failed with {0}", .0.exporter_name())] - ExportErr(Box), - /// Invalid instrument configuration such invalid instrument name, invalid instrument description, invalid instrument unit, etc. - /// See [spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#general-characteristics) - /// for full list of requirements. - #[error("Invalid instrument configuration: {0}")] - InvalidInstrumentConfiguration(&'static str), -} - -impl From for MetricError { - fn from(err: T) -> Self { - MetricError::ExportErr(Box::new(err)) - } -} - -impl From> for MetricError { - fn from(err: PoisonError) -> Self { - MetricError::Other(err.to_string()) - } -} - struct F64Hashable(f64); impl PartialEq for F64Hashable { diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index 3f0f74e90e..09443c4b4c 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -186,9 +186,17 @@ pub use self::{ tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer}, tracer_provider::TracerProvider, }; -use crate::{ExportError, KeyValue}; +use crate::KeyValue; use std::sync::PoisonError; +// TODO - Move ExportError and TraceError to opentelemetry-sdk + +/// Trait for errors returned by exporters +pub trait ExportError: std::error::Error + Send + Sync + 'static { + /// The name of exporter that returned this error + fn exporter_name(&self) -> &'static str; +} + /// Describe the result of operations in tracing API. pub type TraceResult = Result; diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 17fcf2833e..1ede37b1ee 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -28,11 +28,11 @@ impl LogProcessor for NoOpLogProcessor { ) { } - fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { + fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> { Ok(()) } - fn shutdown(&self) -> opentelemetry::logs::LogResult<()> { + fn shutdown(&self) -> opentelemetry_sdk::logs::LogResult<()> { Ok(()) } }