Skip to content

Commit

Permalink
implement minitrace-futures
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Feb 7, 2024
1 parent 36f3263 commit c559b22
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 9 deletions.
7 changes: 7 additions & 0 deletions minitrace-futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ categories = ["development-tools::debugging"]
keywords = ["tracing", "span", "datadog", "jaeger", "opentelemetry"]

[dependencies]
futures = "0.3"
minitrace = { version = "0.6.3", path = "../minitrace" }
pin-project-lite = "0.2.13"

[dev-dependencies]
async-stream = "0.3.5"
tokio = { version = "1", features = ["rt", "time", "macros"] }
164 changes: 155 additions & 9 deletions minitrace-futures/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,160 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
#![doc = include_str!("../README.md")]

use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use futures::Sink;
use futures::Stream;
use minitrace::Span;
use pin_project_lite::pin_project;

// There is no boundary in order to support types that
// implement both stream and sink.
impl<T> Instrumented for T {}

/// Instrument [`futures::Stream`]s and [`futures::Sink`]s.
pub trait Instrumented: Sized {
/// For [`Stream`]s :
///
/// Binds a [`Span`] to the [`Stream`] that continues to record until the stream is **finished**.
///
/// In addition, it sets the span as the local parent at every poll so that [`minitrace::local::LocalSpan`]
/// becomes available within the future. Internally, it calls [`Span::set_local_parent`] when
/// the executor polls it.
///
/// # Examples:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use async_stream::stream;
/// use futures::StreamExt;
/// use minitrace::prelude::*;
/// use minitrace_futures::*;
///
/// let root = Span::root("Root", SpanContext::random());
/// let s = stream! {
/// for i in 0..2 {
/// yield i;
/// }
/// }
/// .in_span(Span::enter_with_parent("Task", &root));
///
/// tokio::pin!(s);
///
/// assert_eq!(s.next().await.unwrap(), 0);
/// assert_eq!(s.next().await.unwrap(), 1);
/// assert_eq!(s.next().await, None);
/// // span ends here.
///
/// # }
/// ```
///
/// For [`Sink`]s :
///
/// Binds a [`Span`] to the [`Sink`] that continues to record until the sink is **closed**.
///
/// In addition, it sets the span as the local parent at every poll so that [`minitrace::local::LocalSpan`]
/// becomes available within the future. Internally, it calls [`Span::set_local_parent`] when
/// the executor polls it.
///
/// # Examples:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use futures::sink;
/// use futures::sink::SinkExt;
/// use minitrace::prelude::*;
/// use minitrace_futures::*;
///
/// let root = Span::root("Root", SpanContext::random());
///
/// let mut drain = sink::drain().in_span(Span::enter_with_parent("Task", &root));
///
/// drain.send(1).await.unwrap();
/// drain.send(2).await.unwrap();
/// drain.close().await.unwrap();
/// // span ends here.
///
/// # }
/// ```
fn in_span(self, span: Span) -> InSpan<Self> {
InSpan {
inner: self,
span: Some(span),
}
}
}

pin_project! {
pub struct InSpan<T> {
#[pin]
inner: T,
span: Option<Span>,
}
}

impl<T> Stream for InSpan<T>
where T: Stream
{
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

let _guard = this.span.as_ref().map(|s| s.set_local_parent());
let res = this.inner.poll_next(cx);

match res {
r @ Poll::Pending => r,
r @ Poll::Ready(None) => {
// finished
this.span.take();
r
}
other => other,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
impl<T, I> Sink<I> for InSpan<T>
where T: Sink<I>
{
type Error = T::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();

let _guard = this.span.as_ref().map(|s| s.set_local_parent());
let res = this.inner.poll_close(cx);

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
match res {
r @ Poll::Pending => r,
other => {
// closed
this.span.take();
other
}
}
}
}

0 comments on commit c559b22

Please sign in to comment.