Skip to content

Commit

Permalink
chore: make tracing optional
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jul 30, 2024
1 parent cf95990 commit 58a916c
Show file tree
Hide file tree
Showing 29 changed files with 364 additions and 295 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ jobs:
- name: Check with unstable flag
run: cargo check --features unstable

- name: Check with tracing feature
run: cargo check --features tracing

- name: Run lib tests and doc tests
run: cargo test

Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ rust-version = "1.63"
# Enables `futures::Stream` implementations for various types.
stream = []

# Enables tracing.
tracing = ["dep:tracing"]

# Enables **unstable** APIs. Any API exposed by this feature has no backwards
# compatibility guarantees. In other words, you should not use this feature for
# anything besides experimentation. Definitely **do not** publish a crate that
Expand All @@ -46,12 +49,14 @@ tokio-util = { version = "0.7.1", features = ["codec", "io"] }
tokio = { version = "1", features = ["io-util"] }
bytes = "1"
http = "1"
tracing = { version = "0.1.35", default-features = false, features = ["std"] }
tracing = { version = "0.1.35", default-features = false, features = ["std"], optional = true }
fnv = "1.0.5"
slab = "0.4.2"
indexmap = { version = "2", features = ["std"] }

[dev-dependencies]
# Test
tracing = { version = "0.1.35", default-features = false, features = ["std"] }

# Fuzzing
quickcheck = { version = "1.0.3", default-features = false }
Expand Down
21 changes: 14 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
use crate::proto::{self, Error};
use crate::{FlowControl, PingPong, RecvStream, SendStream};

#[cfg(feature = "tracing")]
use ::tracing::Instrument;
use bytes::{Buf, Bytes};
use http::{uri, HeaderMap, Method, Request, Response, Version};
use std::fmt;
Expand All @@ -149,7 +151,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::Instrument;

/// Initializes new HTTP/2 streams on a connection by sending a request.
///
Expand Down Expand Up @@ -1275,10 +1276,15 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
let builder = Builder::new();
builder

#[cfg(feature = "tracing")]
return builder
.handshake(io)
.instrument(tracing::trace_span!("client_handshake"))
.await
.instrument(::tracing::trace_span!("client_handshake"))
.await;

#[cfg(not(feature = "tracing"))]
return builder.handshake(io).await;
}

// ===== impl Connection =====
Expand All @@ -1287,12 +1293,12 @@ async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
where
T: AsyncRead + AsyncWrite + Unpin,
{
tracing::debug!("binding client connection");
debug!("binding client connection");

let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
io.write_all(msg).await.map_err(crate::Error::from_io)?;

tracing::debug!("client connection bound");
debug!("client connection bound");

Ok(())
}
Expand Down Expand Up @@ -1439,7 +1445,7 @@ where
self.inner.maybe_close_connection_if_no_streams();
let result = self.inner.poll(cx).map_err(Into::into);
if result.is_pending() && !self.inner.has_streams_or_other_references() {
tracing::trace!("last stream closed during poll, wake again");
trace!("last stream closed during poll, wake again");
cx.waker().wake_by_ref();
}
result
Expand Down Expand Up @@ -1646,6 +1652,7 @@ impl Peer {
impl proto::Peer for Peer {
type Poll = Response<()>;

#[cfg(feature = "tracing")]
const NAME: &'static str = "Client";

fn r#dyn() -> proto::DynPeer {
Expand Down
60 changes: 29 additions & 31 deletions src/codec/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ fn decode_frame(
partial_inout: &mut Option<Partial>,
mut bytes: BytesMut,
) -> Result<Option<Frame>, Error> {
let span = tracing::trace_span!("FramedRead::decode_frame", offset = bytes.len());
let _e = span.enter();
let _span = trace_span!("FramedRead::decode_frame", offset = bytes.len());

tracing::trace!("decoding frame from {}B", bytes.len());
trace!("decoding frame from {}B", bytes.len());

// Parse the head
let head = frame::Head::parse(&bytes);
Expand All @@ -141,7 +140,7 @@ fn decode_frame(

let kind = head.kind();

tracing::trace!(frame.kind = ?kind);
trace!(frame.kind = ?kind);

macro_rules! header_block {
($frame:ident, $head:ident, $bytes:ident) => ({
Expand All @@ -159,8 +158,8 @@ fn decode_frame(
// `PROTOCOL_ERROR`.
return Err(Error::library_reset($head.stream_id(), Reason::PROTOCOL_ERROR));
},
Err(e) => {
proto_err!(conn: "failed to load frame; err={:?}", e);
Err(_e) => {
proto_err!(conn: "failed to load frame; err={:?}", _e);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
};
Expand All @@ -176,16 +175,16 @@ fn decode_frame(
proto_err!(stream: "malformed header block; stream={:?}", id);
return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
},
Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e);
Err(_e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", _e);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
}

if is_end_headers {
frame.into()
} else {
tracing::trace!("loaded partial header block");
trace!("loaded partial header block");
// Defer returning the frame
*partial_inout = Some(Partial {
frame: Continuable::$frame(frame),
Expand All @@ -202,26 +201,26 @@ fn decode_frame(
Kind::Settings => {
let res = frame::Settings::load(head, &bytes[frame::HEADER_LEN..]);

res.map_err(|e| {
proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load SETTINGS frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
}
Kind::Ping => {
let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]);

res.map_err(|e| {
proto_err!(conn: "failed to load PING frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load PING frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
}
Kind::WindowUpdate => {
let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]);

res.map_err(|e| {
proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
Expand All @@ -231,25 +230,25 @@ fn decode_frame(
let res = frame::Data::load(head, bytes.freeze());

// TODO: Should this always be connection level? Probably not...
res.map_err(|e| {
proto_err!(conn: "failed to load DATA frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load DATA frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
}
Kind::Headers => header_block!(Headers, head, bytes),
Kind::Reset => {
let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load RESET frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load RESET frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
}
Kind::GoAway => {
let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
res.map_err(|_e| {
proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", _e);
Error::library_go_away(Reason::PROTOCOL_ERROR)
})?
.into()
Expand All @@ -272,8 +271,8 @@ fn decode_frame(
proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
}
Err(e) => {
proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
Err(_e) => {
proto_err!(conn: "failed to load PRIORITY frame; err={:?};", _e);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
}
Expand Down Expand Up @@ -301,7 +300,7 @@ fn decode_frame(
} else {
let cnt = partial.continuation_frames_count + 1;
if cnt > max_continuation_frames {
tracing::debug!("too_many_continuations, max = {}", max_continuation_frames);
debug!("too_many_continuations, max = {}", max_continuation_frames);
return Err(Error::library_go_away_data(
Reason::ENHANCE_YOUR_CALM,
"too_many_continuations",
Expand Down Expand Up @@ -348,8 +347,8 @@ fn decode_frame(
proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
return Err(Error::library_reset(id, Reason::PROTOCOL_ERROR));
}
Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e);
Err(_e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", _e);
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
}
}
Expand Down Expand Up @@ -377,17 +376,16 @@ where
type Item = Result<Frame, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let span = tracing::trace_span!("FramedRead::poll_next");
let _e = span.enter();
let _span = trace_span!("FramedRead::poll_next");
loop {
tracing::trace!("poll");
trace!("poll");
let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(bytes)) => bytes,
Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))),
None => return Poll::Ready(None),
};

tracing::trace!(read.bytes = bytes.len());
trace!(read.bytes = bytes.len());
let Self {
ref mut hpack,
max_header_list_size,
Expand All @@ -402,7 +400,7 @@ where
partial,
bytes,
)? {
tracing::debug!(?frame, "received");
debug!(?frame, "received");
return Poll::Ready(Some(Ok(frame)));
}
}
Expand Down
30 changes: 14 additions & 16 deletions src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,18 @@ where

/// Flush buffered data to the wire
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
let span = tracing::trace_span!("FramedWrite::flush");
let _e = span.enter();
let _span = trace_span!("FramedWrite::flush");

loop {
while !self.encoder.is_empty() {
match self.encoder.next {
Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true);
trace!(queued_data_frame = true);
let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
}
_ => {
tracing::trace!(queued_data_frame = false);
trace!(queued_data_frame = false);
ready!(poll_write_buf(
Pin::new(&mut self.inner),
cx,
Expand All @@ -156,7 +155,7 @@ where
}
}

tracing::trace!("flushing buffer");
trace!("flushing buffer");
// Flush the upstream
ready!(Pin::new(&mut self.inner).poll_flush(cx))?;

Expand Down Expand Up @@ -207,10 +206,9 @@ where
fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
// Ensure that we have enough capacity to accept the write.
assert!(self.has_capacity());
let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
let _e = span.enter();
let _span = trace_span!("FramedWrite::buffer", frame = ?item);

tracing::debug!(frame = ?item, "send");
debug!(frame = ?item, "send");

match item {
Frame::Data(mut v) => {
Expand Down Expand Up @@ -259,31 +257,31 @@ where
}
Frame::Settings(v) => {
v.encode(self.buf.get_mut());
tracing::trace!(rem = self.buf.remaining(), "encoded settings");
trace!(rem = self.buf.remaining(), "encoded settings");
}
Frame::GoAway(v) => {
v.encode(self.buf.get_mut());
tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
trace!(rem = self.buf.remaining(), "encoded go_away");
}
Frame::Ping(v) => {
v.encode(self.buf.get_mut());
tracing::trace!(rem = self.buf.remaining(), "encoded ping");
trace!(rem = self.buf.remaining(), "encoded ping");
}
Frame::WindowUpdate(v) => {
v.encode(self.buf.get_mut());
tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
trace!(rem = self.buf.remaining(), "encoded window_update");
}

Frame::Priority(_) => {
/*
v.encode(self.buf.get_mut());
tracing::trace!("encoded priority; rem={:?}", self.buf.remaining());
*/
v.encode(self.buf.get_mut());
trace!("encoded priority; rem={:?}", self.buf.remaining());
*/
unimplemented!();
}
Frame::Reset(v) => {
v.encode(self.buf.get_mut());
tracing::trace!(rem = self.buf.remaining(), "encoded reset");
trace!(rem = self.buf.remaining(), "encoded reset");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl GoAway {
}

pub fn encode<B: BufMut>(&self, dst: &mut B) {
tracing::trace!("encoding GO_AWAY; code={:?}", self.error_code);
trace!("encoding GO_AWAY; code={:?}", self.error_code);
let head = Head::new(Kind::GoAway, 0, StreamId::zero());
head.encode(8 + self.debug_data.len(), dst);
dst.put_u32(self.last_stream_id.into());
Expand Down
Loading

0 comments on commit 58a916c

Please sign in to comment.