diff --git a/Cargo.lock b/Cargo.lock index dcbc6d1519..5679a3dd97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3645,6 +3645,7 @@ dependencies = [ "thiserror", "time", "tracing", + "tracing-futures", "uuid", "whoami", ] @@ -3691,6 +3692,7 @@ dependencies = [ "thiserror", "time", "tracing", + "tracing-futures", "uuid", "whoami", ] @@ -4090,6 +4092,18 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "futures", + "futures-task", + "pin-project", + "tracing", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 79e8ff3882..fc9e592426 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,7 @@ mac_address = "1.1.5" rust_decimal = { version = "1.26.1", default-features = false, features = ["std"] } time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] } uuid = "1.1.2" +tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] } # Common utility crates dotenvy = { version = "0.15.0", default-features = false } diff --git a/sqlx-core/src/any/database.rs b/sqlx-core/src/any/database.rs index 9c3f15bb1f..32adfbf675 100644 --- a/sqlx-core/src/any/database.rs +++ b/sqlx-core/src/any/database.rs @@ -31,6 +31,7 @@ impl Database for Any { type Statement<'q> = AnyStatement<'q>; const NAME: &'static str = "Any"; + const NAME_LOWERCASE: &'static str = "any"; const URL_SCHEMES: &'static [&'static str] = &[]; } diff --git a/sqlx-core/src/any/options.rs b/sqlx-core/src/any/options.rs index bb29d817c9..3fe66a6c4d 100644 --- a/sqlx-core/src/any/options.rs +++ b/sqlx-core/src/any/options.rs @@ -62,4 +62,9 @@ impl ConnectOptions for AnyConnectOptions { self.log_settings.slow_statements_duration = duration; self } + + fn set_span_level(mut self, level: LevelFilter) -> Self { + self.log_settings.span_level = level; + self + } } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index ce2aa6c629..b975b3bd34 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -161,6 +161,7 @@ pub struct LogSettings { pub statements_level: LevelFilter, pub slow_statements_level: LevelFilter, pub slow_statements_duration: Duration, + pub span_level: LevelFilter, } impl Default for LogSettings { @@ -169,6 +170,7 @@ impl Default for LogSettings { statements_level: LevelFilter::Debug, slow_statements_level: LevelFilter::Warn, slow_statements_duration: Duration::from_secs(1), + span_level: LevelFilter::Info, } } } @@ -181,6 +183,10 @@ impl LogSettings { self.slow_statements_level = level; self.slow_statements_duration = duration; } + + pub fn set_span_level(&mut self, level: LevelFilter) { + self.span_level = level; + } } pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug + Clone { @@ -234,5 +240,8 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug + fn disable_statement_logging(self) -> Self { self.log_statements(LevelFilter::Off) .log_slow_statements(LevelFilter::Off, Duration::default()) + .set_span_level(LevelFilter::Off) } + + fn set_span_level(self, level: LevelFilter) -> Self; } diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index e44c3d88ac..a0992e8ed2 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -105,6 +105,7 @@ pub trait Database: 'static + Sized + Send + Debug { /// The display name for this database driver. const NAME: &'static str; + const NAME_LOWERCASE: &'static str; /// The schemes for database URLs that should match this driver. const URL_SCHEMES: &'static [&'static str]; diff --git a/sqlx-core/src/logger.rs b/sqlx-core/src/logger.rs index 41725df99b..7cbc047753 100644 --- a/sqlx-core/src/logger.rs +++ b/sqlx-core/src/logger.rs @@ -38,6 +38,22 @@ macro_rules! private_tracing_dynamic_event { }}; } +#[doc(hidden)] +#[macro_export] +macro_rules! private_tracing_dynamic_span { + (target: $target:expr, $level:expr, $($args:tt)*) => {{ + use ::tracing::Level; + + match $level { + Level::ERROR => ::tracing::span!(target: $target, Level::ERROR, $($args)*), + Level::WARN => ::tracing::span!(target: $target, Level::WARN, $($args)*), + Level::INFO => ::tracing::span!(target: $target, Level::INFO, $($args)*), + Level::DEBUG => ::tracing::span!(target: $target, Level::DEBUG, $($args)*), + Level::TRACE => ::tracing::span!(target: $target, Level::TRACE, $($args)*), + } + }}; +} + #[doc(hidden)] pub fn private_level_filter_to_levels( filter: log::LevelFilter, @@ -68,19 +84,47 @@ pub struct QueryLogger<'q> { rows_affected: u64, start: Instant, settings: LogSettings, + pub span: tracing::Span, } impl<'q> QueryLogger<'q> { - pub fn new(sql: &'q str, settings: LogSettings) -> Self { + pub fn new(sql: &'q str, db_system: &'q str, settings: LogSettings) -> Self { + let span = Self::new_tracing_span(db_system, &settings); + Self { sql, rows_returned: 0, rows_affected: 0, start: Instant::now(), settings, + span, } } + fn new_tracing_span(db_system: &str, settings: &LogSettings) -> tracing::Span { + // only create a usable span if the span level is set (i.e. not 'OFF') and + // the filter would output it + if let Some(level) = private_level_filter_to_trace_level(settings.span_level) { + if private_tracing_dynamic_enabled!(target: "sqlx::query", level) { + use tracing::field::Empty; + + return private_tracing_dynamic_span!( + target: "sqlx::query", + level, + "sqlx::query", + summary = Empty, + db.statement = Empty, + db.system = db_system, + rows_affected = Empty, + rows_returned = Empty, + ); + } + } + + // No-op span preferred over Option + tracing::Span::none() + } + pub fn increment_rows_returned(&mut self) { self.rows_returned += 1; } @@ -100,27 +144,50 @@ impl<'q> QueryLogger<'q> { self.settings.statements_level }; - if let Some((tracing_level, log_level)) = private_level_filter_to_levels(lvl) { - // The enabled level could be set from either tracing world or log world, so check both - // to see if logging should be enabled for our level - let log_is_enabled = log::log_enabled!(target: "sqlx::query", log_level) - || private_tracing_dynamic_enabled!(target: "sqlx::query", tracing_level); - if log_is_enabled { - let mut summary = parse_query_summary(self.sql); - - let sql = if summary != self.sql { - summary.push_str(" …"); - format!( - "\n\n{}\n", - sqlformat::format( - self.sql, - &sqlformat::QueryParams::None, - sqlformat::FormatOptions::default() - ) + let span_level = private_level_filter_to_trace_level(self.settings.span_level); + let log_levels = private_level_filter_to_levels(lvl); + + let span_is_enabled = span_level + .map(|level| private_tracing_dynamic_enabled!(target: "sqlx::query", level)) + .unwrap_or(false); + + let log_is_enabled = log_levels + .map(|(tracing_level, log_level)| { + // The enabled level could be set from either tracing world or log world, so check both + // to see if logging should be enabled for our level + log::log_enabled!(target: "sqlx::query", log_level) + || private_tracing_dynamic_enabled!(target: "sqlx::query", tracing_level) + }) + .unwrap_or(false); + + // only do these potentially expensive operations if the span or log will record them + if span_is_enabled || log_is_enabled { + let mut summary = parse_query_summary(self.sql); + + let sql = if summary != self.sql { + summary.push_str(" …"); + format!( + "\n\n{}\n", + sqlformat::format( + self.sql, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions::default() ) - } else { - String::new() - }; + ) + } else { + String::new() + }; + + // in the case where span_is_enabled is false, these will no-op + self.span.record("summary", &summary); + self.span.record("db.statement", &sql); + self.span.record("rows_affected", self.rows_affected); + self.span.record("rows_returned", self.rows_returned); + + let _e = self.span.enter(); + + if log_is_enabled { + let (tracing_level, _) = log_levels.unwrap(); // previously checked to be some if was_slow { private_tracing_dynamic_event!( diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index bbcc43134e..f3160f30a0 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -4,7 +4,9 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; +use crate::private_tracing_dynamic_span; use crossbeam_queue::ArrayQueue; +use tracing::Instrument; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; @@ -245,6 +247,10 @@ impl PoolInner { } } + pub fn tracing_span_level(&self) -> Level { + std::cmp::max(self.acquire_slow_level, self.acquire_time_level).unwrap_or(Level::TRACE) + } + pub(super) async fn acquire(self: &Arc) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); @@ -253,6 +259,13 @@ impl PoolInner { let acquire_started_at = Instant::now(); let deadline = acquire_started_at + self.options.acquire_timeout; + let span = private_tracing_dynamic_span!( + target: "sqlx::pool::acquire", + self.tracing_span_level(), + "sqlx::pool::acquire", + db.system = DB::NAME_LOWERCASE, + ); + let acquired = crate::rt::timeout( self.options.acquire_timeout, async { @@ -295,6 +308,7 @@ impl PoolInner { } } ) + .instrument(span.clone()) .await .map_err(|_| Error::PoolTimedOut)??; @@ -304,6 +318,8 @@ impl PoolInner { .acquire_slow_level .filter(|_| acquired_after > self.options.acquire_slow_threshold); + let _e = span.enter(); + if let Some(level) = acquire_slow_level { private_tracing_dynamic_event!( target: "sqlx::pool::acquire", diff --git a/sqlx-mysql/Cargo.toml b/sqlx-mysql/Cargo.toml index a904bc0eef..c5d37d4f8d 100644 --- a/sqlx-mysql/Cargo.toml +++ b/sqlx-mysql/Cargo.toml @@ -68,6 +68,7 @@ smallvec = "1.7.0" stringprep = "0.1.2" thiserror = "1.0.35" tracing = { version = "0.1.37", features = ["log"] } +tracing-futures.workspace = true whoami = { version = "1.2.1", default-features = false } serde = { version = "1.0.144", optional = true } diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index 07c7979b08..9ebeea4f10 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -22,6 +22,8 @@ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_core::Stream; use futures_util::{pin_mut, TryStreamExt}; +use sqlx_core::database::Database; +use tracing_futures::Instrument; use std::{borrow::Cow, sync::Arc}; impl MySqlConnection { @@ -106,7 +108,8 @@ impl MySqlConnection { persistent: bool, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone()); + let mut logger = QueryLogger::new(sql, MySql::NAME_LOWERCASE, self.inner.log_settings.clone()); + let span_handle = logger.span.clone(); self.inner.stream.wait_until_ready().await?; self.inner.stream.waiting.push_back(Waiting::Result); @@ -240,7 +243,7 @@ impl MySqlConnection { r#yield!(v); } } - })) + }).instrument(span_handle)) } } diff --git a/sqlx-mysql/src/database.rs b/sqlx-mysql/src/database.rs index d03a567284..7bf12edca8 100644 --- a/sqlx-mysql/src/database.rs +++ b/sqlx-mysql/src/database.rs @@ -31,6 +31,7 @@ impl Database for MySql { type Statement<'q> = MySqlStatement<'q>; const NAME: &'static str = "MySQL"; + const NAME_LOWERCASE: &'static str = "mysql"; const URL_SCHEMES: &'static [&'static str] = &["mysql", "mariadb"]; } diff --git a/sqlx-mysql/src/options/connect.rs b/sqlx-mysql/src/options/connect.rs index 116a49ccad..24024ad510 100644 --- a/sqlx-mysql/src/options/connect.rs +++ b/sqlx-mysql/src/options/connect.rs @@ -94,4 +94,9 @@ impl ConnectOptions for MySqlConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn set_span_level(mut self, level: LevelFilter) -> Self { + self.log_settings.span_level = level; + self + } } diff --git a/sqlx-postgres/Cargo.toml b/sqlx-postgres/Cargo.toml index 55a94eceb1..ef56e9e749 100644 --- a/sqlx-postgres/Cargo.toml +++ b/sqlx-postgres/Cargo.toml @@ -31,6 +31,7 @@ futures-channel = { version = "0.3.19", default-features = false, features = ["s futures-core = { version = "0.3.19", default-features = false } futures-io = "0.3.24" futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] } +tracing-futures.workspace = true # Cryptographic Primitives crc = "3.0.0" diff --git a/sqlx-postgres/src/connection/executor.rs b/sqlx-postgres/src/connection/executor.rs index d24dc83762..1ee728079f 100644 --- a/sqlx-postgres/src/connection/executor.rs +++ b/sqlx-postgres/src/connection/executor.rs @@ -17,7 +17,9 @@ use futures_core::stream::BoxStream; use futures_core::Stream; use futures_util::{pin_mut, TryStreamExt}; use sqlx_core::arguments::Arguments; +use sqlx_core::database::Database; use sqlx_core::Either; +use tracing_futures::Instrument; use std::{borrow::Cow, sync::Arc}; async fn prepare( @@ -195,7 +197,8 @@ impl PgConnection { persistent: bool, metadata_opt: Option>, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(query, self.log_settings.clone()); + let mut logger = QueryLogger::new(query, Postgres::NAME_LOWERCASE, self.log_settings.clone()); + let span_handle = logger.span.clone(); // before we continue, wait until we are "ready" to accept more queries self.wait_until_ready().await?; @@ -362,7 +365,7 @@ impl PgConnection { } Ok(()) - }) + }.instrument(span_handle)) } } diff --git a/sqlx-postgres/src/connection/stream.rs b/sqlx-postgres/src/connection/stream.rs index f165899248..dca487dc3d 100644 --- a/sqlx-postgres/src/connection/stream.rs +++ b/sqlx-postgres/src/connection/stream.rs @@ -6,6 +6,7 @@ use futures_channel::mpsc::UnboundedSender; use futures_util::SinkExt; use log::Level; use sqlx_core::bytes::Buf; +use sqlx_core::database::Database; use crate::connection::tls::MaybeUpgradeTls; use crate::error::Error; @@ -14,7 +15,7 @@ use crate::message::{ ParameterStatus, ReceivedMessage, }; use crate::net::{self, BufferedSocket, Socket}; -use crate::{PgConnectOptions, PgDatabaseError, PgSeverity}; +use crate::{PgConnectOptions, PgDatabaseError, PgSeverity, Postgres}; // the stream is a separate type from the connection to uphold the invariant where an instantiated // [PgConnection] is a **valid** connection to postgres @@ -185,6 +186,7 @@ impl PgStream { sqlx_core::private_tracing_dynamic_event!( target: "sqlx::postgres::notice", tracing_level, + db.system = Postgres::NAME_LOWERCASE, message = notice.message() ); } diff --git a/sqlx-postgres/src/database.rs b/sqlx-postgres/src/database.rs index 876e295899..bc4f2d54e2 100644 --- a/sqlx-postgres/src/database.rs +++ b/sqlx-postgres/src/database.rs @@ -33,6 +33,7 @@ impl Database for Postgres { type Statement<'q> = PgStatement<'q>; const NAME: &'static str = "PostgreSQL"; + const NAME_LOWERCASE: &'static str = "postgresql"; const URL_SCHEMES: &'static [&'static str] = &["postgres", "postgresql"]; } diff --git a/sqlx-postgres/src/options/connect.rs b/sqlx-postgres/src/options/connect.rs index bc6e4adce9..0f27905b94 100644 --- a/sqlx-postgres/src/options/connect.rs +++ b/sqlx-postgres/src/options/connect.rs @@ -33,4 +33,9 @@ impl ConnectOptions for PgConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn set_span_level(mut self, level: LevelFilter) -> Self { + self.log_settings.span_level = level; + self + } } diff --git a/sqlx-sqlite/src/connection/execute.rs b/sqlx-sqlite/src/connection/execute.rs index 8a76236977..36b26055c3 100644 --- a/sqlx-sqlite/src/connection/execute.rs +++ b/sqlx-sqlite/src/connection/execute.rs @@ -2,8 +2,10 @@ use crate::connection::{ConnectionHandle, ConnectionState}; use crate::error::Error; use crate::logger::QueryLogger; use crate::statement::{StatementHandle, VirtualStatement}; -use crate::{SqliteArguments, SqliteQueryResult, SqliteRow}; +use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow}; +use sqlx_core::database::Database; use sqlx_core::Either; +use tracing::Span; pub struct ExecuteIter<'a> { handle: &'a mut ConnectionHandle, @@ -16,6 +18,7 @@ pub struct ExecuteIter<'a> { args_used: usize, goto_next: bool, + span: Span, } pub(crate) fn iter<'a>( @@ -27,7 +30,8 @@ pub(crate) fn iter<'a>( // fetch the cached statement or allocate a new one let statement = conn.statements.get(query, persistent)?; - let logger = QueryLogger::new(query, conn.log_settings.clone()); + let logger = QueryLogger::new(query, Sqlite::NAME_LOWERCASE, conn.log_settings.clone()); + let span = logger.span.clone(); Ok(ExecuteIter { handle: &mut conn.handle, @@ -36,6 +40,7 @@ pub(crate) fn iter<'a>( args, args_used: 0, goto_next: true, + span }) } @@ -67,6 +72,8 @@ impl Iterator for ExecuteIter<'_> { type Item = Result, Error>; fn next(&mut self) -> Option { + let _e = self.span.enter(); + let statement = if self.goto_next { let statement = match self.statement.prepare_next(self.handle) { Ok(Some(statement)) => statement, diff --git a/sqlx-sqlite/src/database.rs b/sqlx-sqlite/src/database.rs index c89c7b8322..c91d55ad86 100644 --- a/sqlx-sqlite/src/database.rs +++ b/sqlx-sqlite/src/database.rs @@ -32,6 +32,7 @@ impl Database for Sqlite { type Statement<'q> = SqliteStatement<'q>; const NAME: &'static str = "SQLite"; + const NAME_LOWERCASE: &'static str = "sqlite"; const URL_SCHEMES: &'static [&'static str] = &["sqlite"]; } diff --git a/sqlx-sqlite/src/logger.rs b/sqlx-sqlite/src/logger.rs index 40fabd48ed..20a0af06aa 100644 --- a/sqlx-sqlite/src/logger.rs +++ b/sqlx-sqlite/src/logger.rs @@ -7,10 +7,12 @@ )] use crate::connection::intmap::IntMap; +use crate::Sqlite; use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; +use sqlx_core::database::Database; pub(crate) use sqlx_core::logger::*; #[derive(Debug)] @@ -429,6 +431,7 @@ impl<'q, R: Debug, S: Debug + DebugDiff, P: Debug> QueryPlanLogger<'q, R, S, P> sqlx_core::private_tracing_dynamic_event!( target: "sqlx::explain", tracing::Level::TRACE, + db.system = Sqlite::NAME_LOWERCASE, "{}; program:\n{}\n\n{:?}", summary, self, sql ); } diff --git a/sqlx-sqlite/src/options/connect.rs b/sqlx-sqlite/src/options/connect.rs index 309f2430e0..17dbb1b1d8 100644 --- a/sqlx-sqlite/src/options/connect.rs +++ b/sqlx-sqlite/src/options/connect.rs @@ -59,6 +59,11 @@ impl ConnectOptions for SqliteConnectOptions { self.log_settings.log_slow_statements(level, duration); self } + + fn set_span_level(mut self, level: LevelFilter) -> Self { + self.log_settings.span_level = level; + self + } } impl SqliteConnectOptions {