Skip to content

Commit

Permalink
wrap timed streams and iterators in tracing::Spans
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Sep 6, 2024
1 parent 10bec32 commit 83a4f71
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 21 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ mac_address = "1.1.5"
rust_decimal = { version = "1.26.1", default-features = false, features = ["std"] }
time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] }
uuid = "1.1.2"
tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] }

# Common utility crates
dotenvy = { version = "0.15.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/src/any/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Database for Any {
type Statement<'q> = AnyStatement<'q>;

const NAME: &'static str = "Any";
const NAME_LOWERCASE: &'static str = "any";

const URL_SCHEMES: &'static [&'static str] = &[];
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ impl LogSettings {
self.slow_statements_level = level;
self.slow_statements_duration = duration;
}

pub fn tracing_span_level(&self) -> LevelFilter {
std::cmp::max(self.slow_statements_level, self.statements_level)
}
}

pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub trait Database: 'static + Sized + Send + Debug {

/// The display name for this database driver.
const NAME: &'static str;
const NAME_LOWERCASE: &'static str;

/// The schemes for database URLs that should match this driver.
const URL_SCHEMES: &'static [&'static str];
Expand Down
61 changes: 47 additions & 14 deletions sqlx-core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ macro_rules! private_tracing_dynamic_event {
}};
}

#[doc(hidden)]
#[macro_export]
macro_rules! private_tracing_dynamic_span {
(target: $target:expr, $level:expr, $($args:tt)*) => {{
use ::tracing::Level;

match $level {
Level::ERROR => ::tracing::span!(target: $target, Level::ERROR, $($args)*),
Level::WARN => ::tracing::span!(target: $target, Level::WARN, $($args)*),
Level::INFO => ::tracing::span!(target: $target, Level::INFO, $($args)*),
Level::DEBUG => ::tracing::span!(target: $target, Level::DEBUG, $($args)*),
Level::TRACE => ::tracing::span!(target: $target, Level::TRACE, $($args)*),
}
}};
}


#[doc(hidden)]
pub fn private_level_filter_to_levels(
filter: log::LevelFilter,
Expand Down Expand Up @@ -68,16 +85,33 @@ pub struct QueryLogger<'q> {
rows_affected: u64,
start: Instant,
settings: LogSettings,
pub span: tracing::Span,
}

impl<'q> QueryLogger<'q> {
pub fn new(sql: &'q str, settings: LogSettings) -> Self {
pub fn new(sql: &'q str, db_system: &'q str, settings: LogSettings) -> Self {
use tracing::field::Empty;

let level = private_level_filter_to_trace_level(settings.tracing_span_level()).unwrap_or(tracing::Level::TRACE);

let span = private_tracing_dynamic_span!(
target: "sqlx::query",
level,
"sqlx::query",
summary = Empty,
db.statement = Empty,
db.system = db_system,
rows_affected = Empty,
rows_returned = Empty,
);

Self {
sql,
rows_returned: 0,
rows_affected: 0,
start: Instant::now(),
settings,
span,
}
}

Expand All @@ -100,6 +134,7 @@ impl<'q> QueryLogger<'q> {
self.settings.statements_level
};

// only create an event if the level filter is set
if let Some((tracing_level, log_level)) = private_level_filter_to_levels(lvl) {
// The enabled level could be set from either tracing world or log world, so check both
// to see if logging should be enabled for our level
Expand All @@ -122,37 +157,35 @@ impl<'q> QueryLogger<'q> {
String::new()
};

if was_slow {
self.span.record("summary", summary);
self.span.record("db.statement", sql);
self.span.record("rows_affected", self.rows_affected);
self.span.record("rows_returned", self.rows_returned);

let _e = self.span.enter();
if was_slow {
private_tracing_dynamic_event!(
target: "sqlx::query",
tracing_level,
summary,
db.statement = sql,
rows_affected = self.rows_affected,
rows_returned = self.rows_returned,
// When logging to JSON, one can trigger alerts from the presence of this field.
slow_threshold=?self.settings.slow_statements_duration,
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
?elapsed,
// Search friendly - numeric
elapsed_secs = elapsed.as_secs_f64(),
// When logging to JSON, one can trigger alerts from the presence of this field.
slow_threshold=?self.settings.slow_statements_duration,
// Make sure to use "slow" in the message as that's likely
// what people will grep for.
"slow statement: execution time exceeded alert threshold"
);
)
} else {
private_tracing_dynamic_event!(
target: "sqlx::query",
tracing_level,
summary,
db.statement = sql,
rows_affected = self.rows_affected,
rows_returned = self.rows_returned,
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
?elapsed,
// Search friendly - numeric
elapsed_secs = elapsed.as_secs_f64(),
);
)
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
use crate::private_tracing_dynamic_span;
use crossbeam_queue::ArrayQueue;
use tracing::Instrument;

use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};

Expand Down Expand Up @@ -245,6 +247,10 @@ impl<DB: Database> PoolInner<DB> {
}
}

pub fn tracing_span_level(&self) -> Level {
std::cmp::max(self.acquire_slow_level, self.acquire_time_level).unwrap_or(Level::TRACE)
}

pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -253,6 +259,13 @@ impl<DB: Database> PoolInner<DB> {
let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;

let span = private_tracing_dynamic_span!(
target: "sqlx::pool::acquire",
self.tracing_span_level(),
"sqlx::pool::acquire",
db.system = DB::NAME_LOWERCASE,
);

let acquired = crate::rt::timeout(
self.options.acquire_timeout,
async {
Expand Down Expand Up @@ -295,6 +308,7 @@ impl<DB: Database> PoolInner<DB> {
}
}
)
.instrument(span.clone())
.await
.map_err(|_| Error::PoolTimedOut)??;

Expand All @@ -304,6 +318,8 @@ impl<DB: Database> PoolInner<DB> {
.acquire_slow_level
.filter(|_| acquired_after > self.options.acquire_slow_threshold);

let _e = span.enter();

if let Some(level) = acquire_slow_level {
private_tracing_dynamic_event!(
target: "sqlx::pool::acquire",
Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ smallvec = "1.7.0"
stringprep = "0.1.2"
thiserror = "1.0.35"
tracing = { version = "0.1.37", features = ["log"] }
tracing-futures.workspace = true
whoami = { version = "1.2.1", default-features = false }

serde = { version = "1.0.144", optional = true }
Expand Down
7 changes: 5 additions & 2 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use sqlx_core::database::Database;
use tracing_futures::Instrument;
use std::{borrow::Cow, sync::Arc};

impl MySqlConnection {
Expand Down Expand Up @@ -106,7 +108,8 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
let mut logger = QueryLogger::new(sql, MySql::NAME_LOWERCASE, self.inner.log_settings.clone());
let span_handle = logger.span.clone();

self.inner.stream.wait_until_ready().await?;
self.inner.stream.waiting.push_back(Waiting::Result);
Expand Down Expand Up @@ -240,7 +243,7 @@ impl MySqlConnection {
r#yield!(v);
}
}
}))
}).instrument(span_handle))
}
}

Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Database for MySql {
type Statement<'q> = MySqlStatement<'q>;

const NAME: &'static str = "MySQL";
const NAME_LOWERCASE: &'static str = "mysql";

const URL_SCHEMES: &'static [&'static str] = &["mysql", "mariadb"];
}
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures-channel = { version = "0.3.19", default-features = false, features = ["s
futures-core = { version = "0.3.19", default-features = false }
futures-io = "0.3.24"
futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] }
tracing-futures.workspace = true

# Cryptographic Primitives
crc = "3.0.0"
Expand Down
7 changes: 5 additions & 2 deletions sqlx-postgres/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use sqlx_core::arguments::Arguments;
use sqlx_core::database::Database;
use sqlx_core::Either;
use tracing_futures::Instrument;
use std::{borrow::Cow, sync::Arc};

async fn prepare(
Expand Down Expand Up @@ -195,7 +197,8 @@ impl PgConnection {
persistent: bool,
metadata_opt: Option<Arc<PgStatementMetadata>>,
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
let mut logger = QueryLogger::new(query, self.log_settings.clone());
let mut logger = QueryLogger::new(query, Postgres::NAME_LOWERCASE, self.log_settings.clone());
let span_handle = logger.span.clone();

// before we continue, wait until we are "ready" to accept more queries
self.wait_until_ready().await?;
Expand Down Expand Up @@ -362,7 +365,7 @@ impl PgConnection {
}

Ok(())
})
}.instrument(span_handle))
}
}

Expand Down
4 changes: 3 additions & 1 deletion sqlx-postgres/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures_channel::mpsc::UnboundedSender;
use futures_util::SinkExt;
use log::Level;
use sqlx_core::bytes::Buf;
use sqlx_core::database::Database;

use crate::connection::tls::MaybeUpgradeTls;
use crate::error::Error;
Expand All @@ -14,7 +15,7 @@ use crate::message::{
ParameterStatus, ReceivedMessage,
};
use crate::net::{self, BufferedSocket, Socket};
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity, Postgres};

// the stream is a separate type from the connection to uphold the invariant where an instantiated
// [PgConnection] is a **valid** connection to postgres
Expand Down Expand Up @@ -185,6 +186,7 @@ impl PgStream {
sqlx_core::private_tracing_dynamic_event!(
target: "sqlx::postgres::notice",
tracing_level,
db.system = Postgres::NAME_LOWERCASE,
message = notice.message()
);
}
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Database for Postgres {
type Statement<'q> = PgStatement<'q>;

const NAME: &'static str = "PostgreSQL";
const NAME_LOWERCASE: &'static str = "postgresql";

const URL_SCHEMES: &'static [&'static str] = &["postgres", "postgresql"];
}
Expand Down
11 changes: 9 additions & 2 deletions sqlx-sqlite/src/connection/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::connection::{ConnectionHandle, ConnectionState};
use crate::error::Error;
use crate::logger::QueryLogger;
use crate::statement::{StatementHandle, VirtualStatement};
use crate::{SqliteArguments, SqliteQueryResult, SqliteRow};
use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow};
use sqlx_core::database::Database;
use sqlx_core::Either;
use tracing::Span;

pub struct ExecuteIter<'a> {
handle: &'a mut ConnectionHandle,
Expand All @@ -16,6 +18,7 @@ pub struct ExecuteIter<'a> {
args_used: usize,

goto_next: bool,
span: Span,
}

pub(crate) fn iter<'a>(
Expand All @@ -27,7 +30,8 @@ pub(crate) fn iter<'a>(
// fetch the cached statement or allocate a new one
let statement = conn.statements.get(query, persistent)?;

let logger = QueryLogger::new(query, conn.log_settings.clone());
let logger = QueryLogger::new(query, Sqlite::NAME_LOWERCASE, conn.log_settings.clone());
let span = logger.span.clone();

Ok(ExecuteIter {
handle: &mut conn.handle,
Expand All @@ -36,6 +40,7 @@ pub(crate) fn iter<'a>(
args,
args_used: 0,
goto_next: true,
span
})
}

Expand Down Expand Up @@ -67,6 +72,8 @@ impl Iterator for ExecuteIter<'_> {
type Item = Result<Either<SqliteQueryResult, SqliteRow>, Error>;

fn next(&mut self) -> Option<Self::Item> {
let _e = self.span.enter();

let statement = if self.goto_next {
let statement = match self.statement.prepare_next(self.handle) {
Ok(Some(statement)) => statement,
Expand Down
Loading

0 comments on commit 83a4f71

Please sign in to comment.