Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach metadata to root span and entire propagate in the entire hierarchy #180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions minitrace/src/collector/global_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;

use super::EventRecord;
use super::SpanMetadata;
use crate::collector::command::CollectCommand;
use crate::collector::command::CommitCollect;
use crate::collector::command::DropCollect;
Expand Down Expand Up @@ -169,11 +170,13 @@ enum SpanCollection {
spans: SpanSet,
trace_id: TraceId,
parent_id: SpanId,
metadata: Option<SpanMetadata>,
},
Shared {
spans: Arc<SpanSet>,
trace_id: TraceId,
parent_id: SpanId,
metadata: Option<SpanMetadata>,
},
}

Expand Down Expand Up @@ -205,14 +208,10 @@ impl GlobalCollector {

std::thread::Builder::new()
.name("minitrace-global-collector".to_string())
.spawn(move || {
loop {
let begin_instant = std::time::Instant::now();
GLOBAL_COLLECTOR.lock().handle_commands(false);
std::thread::sleep(
COLLECT_LOOP_INTERVAL.saturating_sub(begin_instant.elapsed()),
);
}
.spawn(move || loop {
let begin_instant = std::time::Instant::now();
GLOBAL_COLLECTOR.lock().handle_commands(false);
std::thread::sleep(COLLECT_LOOP_INTERVAL.saturating_sub(begin_instant.elapsed()));
})
.unwrap();

Expand Down Expand Up @@ -291,7 +290,7 @@ impl GlobalCollector {
debug_assert!(!collect_token.is_empty());

if collect_token.len() == 1 {
let item = collect_token[0];
let item = collect_token[0].clone();
if let Some((buf, span_count)) = self.active_collectors.get_mut(&item.collect_id) {
if *span_count < self.config.max_spans_per_trace.unwrap_or(usize::MAX)
|| item.is_root
Expand All @@ -301,6 +300,7 @@ impl GlobalCollector {
spans,
trace_id: item.trace_id,
parent_id: item.parent_id,
metadata: item.metadata,
});
}
}
Expand All @@ -318,6 +318,7 @@ impl GlobalCollector {
spans: spans.clone(),
trace_id: item.trace_id,
parent_id: item.parent_id,
metadata: item.metadata.clone(),
});
}
}
Expand All @@ -339,6 +340,7 @@ impl GlobalCollector {
spans,
trace_id,
parent_id,
metadata,
} => match spans {
SpanSet::Span(raw_span) => amend_span(
&raw_span,
Expand All @@ -347,6 +349,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
&local_spans,
Expand All @@ -355,6 +358,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
&local_spans,
Expand All @@ -363,12 +367,14 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
},
SpanCollection::Shared {
spans,
trace_id,
parent_id,
metadata,
} => match &*spans {
SpanSet::Span(raw_span) => amend_span(
raw_span,
Expand All @@ -377,6 +383,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::LocalSpansInner(local_spans) => amend_local_span(
local_spans,
Expand All @@ -385,6 +392,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
SpanSet::SharedLocalSpans(local_spans) => amend_local_span(
local_spans,
Expand All @@ -393,6 +401,7 @@ impl GlobalCollector {
committed_records,
dangling_events,
&anchor,
metadata,
),
},
}
Expand Down Expand Up @@ -423,6 +432,7 @@ fn amend_local_span(
spans: &mut Vec<SpanRecord>,
events: &mut HashMap<SpanId, Vec<EventRecord>>,
anchor: &Anchor,
metadata: Option<SpanMetadata>,
) {
for span in local_spans.spans.iter() {
let begin_time_unix_ns = span.begin_instant.as_unix_nanos(anchor);
Expand Down Expand Up @@ -456,6 +466,7 @@ fn amend_local_span(
name: span.name.clone(),
properties: span.properties.clone(),
events: vec![],
metadata: metadata.clone(),
});
}
}
Expand All @@ -467,6 +478,7 @@ fn amend_span(
spans: &mut Vec<SpanRecord>,
events: &mut HashMap<SpanId, Vec<EventRecord>>,
anchor: &Anchor,
metadata: Option<SpanMetadata>,
) {
let begin_time_unix_ns = raw_span.begin_instant.as_unix_nanos(anchor);

Expand All @@ -490,6 +502,7 @@ fn amend_span(
name: raw_span.name.clone(),
properties: raw_span.properties.clone(),
events: vec![],
metadata,
});
}

Expand Down
138 changes: 133 additions & 5 deletions minitrace/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod global_collector;
pub(crate) mod id;
mod test_reporter;

use std::any::Any;
use std::borrow::Cow;
use std::rc::Rc;
use std::sync::Arc;
Expand Down Expand Up @@ -41,6 +42,64 @@ pub enum SpanSet {
SharedLocalSpans(Arc<LocalSpansInner>),
}

/// A struct representing the metadata of a span. It can be used to store arbitrary data
/// during the processing and reporting of the span.
///
/// The metadata is propagated from the root span to all its children, so it can help to identify span traces and their association with a particular flow.
///
/// Metadata is not sent to the collector, and is only available to the reporter as part of the `SpanRecord`.
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let context = SpanContext::new_with_metadata(TraceId(12), SpanId::default(), SpanMetadata::create::<i32>(123));
/// ```
///
/// In your reporter implementation, you can access the metadata by passing the explicit structure type:
///
/// ```
/// struct MyReporter;
///
/// impl Reporter for MyReporter {
/// fn report(&mut self, spans: &[SpanRecord]) {
/// for span in spans {
/// let metadata: &i32 = span.metadata::<i32>().unwrap();
/// }
/// }
/// }
/// ```
///
/// use minitrace::prelude::*;
#[derive(Debug, Clone)]
pub struct SpanMetadata {
inner: Arc<dyn Any + Send + Sync>,
}

impl SpanMetadata {
pub fn create<T: Send + Sync + 'static>(metadata: T) -> SpanMetadata {
Self {
inner: Arc::new(metadata),
}
}

pub fn data<T>(&self) -> Option<&T>
where
T: 'static,
{
self.inner.downcast_ref::<T>()
}
}

impl PartialEq for SpanMetadata {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}

impl Eq for SpanMetadata {}

/// A record of a span that includes all the information about the span,
/// such as its identifiers, timing information, name, and associated properties.
#[derive(Clone, Debug, Default)]
Expand All @@ -53,6 +112,16 @@ pub struct SpanRecord {
pub name: Cow<'static, str>,
pub properties: Vec<(Cow<'static, str>, Cow<'static, str>)>,
pub events: Vec<EventRecord>,
metadata: Option<SpanMetadata>,
}

impl SpanRecord {
pub fn metadata<T>(&self) -> Option<&T>
where
T: 'static,
{
self.metadata.as_ref()?.data::<T>()
}
}

/// A record of an event that occurred during the execution of a span.
Expand All @@ -64,26 +133,37 @@ pub struct EventRecord {
}

#[doc(hidden)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CollectTokenItem {
pub trace_id: TraceId,
pub parent_id: SpanId,
pub collect_id: usize,
pub is_root: bool,
pub metadata: Option<SpanMetadata>,
}

impl CollectTokenItem {
pub fn metadata<T>(&self) -> Option<&T>
where
T: 'static,
{
self.metadata.as_ref()?.data::<T>()
}
}

/// A struct representing the context of a span, including its [`TraceId`] and [`SpanId`].
///
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
#[derive(Clone, Copy, Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct SpanContext {
pub trace_id: TraceId,
pub span_id: SpanId,
pub metadata: Option<SpanMetadata>,
}

impl SpanContext {
/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`].
/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`] and [`SpanMetadata`].
///
/// # Examples
///
Expand All @@ -96,7 +176,35 @@ impl SpanContext {
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
pub fn new(trace_id: TraceId, span_id: SpanId) -> Self {
Self { trace_id, span_id }
Self {
trace_id,
span_id,
metadata: None,
}
}

/// Creates a new `SpanContext` with the given [`TraceId`] and [`SpanId`] and optional [`SpanMetadata`].
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let context = SpanContext::new(TraceId(12), SpanId::default());
/// ```
///
/// [`TraceId`]: crate::collector::TraceId
/// [`SpanId`]: crate::collector::SpanId
pub fn new_with_metadata(
trace_id: TraceId,
span_id: SpanId,
metadata: Option<SpanMetadata>,
) -> Self {
Self {
trace_id,
span_id,
metadata,
}
}

/// Create a new `SpanContext` with a random trace id.
Expand All @@ -112,6 +220,24 @@ impl SpanContext {
Self {
trace_id: TraceId(rand::random()),
span_id: SpanId::default(),
metadata: None,
}
}

/// Create a new `SpanContext` with a random trace id.
///
/// # Examples
///
/// ```
/// use minitrace::prelude::*;
///
/// let root = Span::root("root", SpanContext::random());
/// ```
pub fn random_with_metadata(metadata: SpanMetadata) -> Self {
Self {
trace_id: TraceId(rand::random()),
span_id: SpanId::default(),
metadata: Some(metadata),
}
}

Expand Down Expand Up @@ -142,6 +268,7 @@ impl SpanContext {
Some(Self {
trace_id: collect_token.trace_id,
span_id: collect_token.parent_id,
metadata: collect_token.metadata.clone(),
})
}
}
Expand Down Expand Up @@ -170,11 +297,12 @@ impl SpanContext {
let stack = LOCAL_SPAN_STACK.try_with(Rc::clone).ok()?;

let mut stack = stack.borrow_mut();
let collect_token = stack.current_collect_token()?[0];
let collect_token = stack.current_collect_token()?[0].clone();

Some(Self {
trace_id: collect_token.trace_id,
span_id: collect_token.parent_id,
metadata: collect_token.metadata.clone(),
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion minitrace/src/local/local_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ mod tests {
parent_id: SpanId::default(),
collect_id: 42,
is_root: false,
metadata: None,
};
let collector2 = LocalCollector::new(Some(token2.into()), stack.clone());
let collector2 = LocalCollector::new(Some(token2.clone().into()), stack.clone());
let span2 = stack.borrow_mut().enter_span("span2").unwrap();
let span3 = stack.borrow_mut().enter_span("span3").unwrap();
stack.borrow_mut().exit_span(span3);
Expand Down Expand Up @@ -242,6 +243,7 @@ span1 []
parent_id: SpanId::default(),
collect_id: 42,
is_root: false,
metadata: None,
};
let collector2 = LocalCollector::new(Some(token2.into()), stack.clone());
let span2 = stack.borrow_mut().enter_span("span2").unwrap();
Expand Down
Loading
Loading