Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrap timed streams and iterators in tracing::Spans; follow Opentelemetry conventions #3313

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ mac_address = "1.1.5"
rust_decimal = { version = "1.26.1", default-features = false, features = ["std"] }
time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] }
uuid = "1.1.2"
tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] }

# Common utility crates
dotenvy = { version = "0.15.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/src/any/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Database for Any {
type Statement<'q> = AnyStatement<'q>;

const NAME: &'static str = "Any";
const NAME_LOWERCASE: &'static str = "any";

const URL_SCHEMES: &'static [&'static str] = &[];
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ impl LogSettings {
self.slow_statements_level = level;
self.slow_statements_duration = duration;
}

pub fn tracing_span_level(&self) -> LevelFilter {
std::cmp::max(self.slow_statements_level, self.statements_level)
}
}

pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub trait Database: 'static + Sized + Send + Debug {

/// The display name for this database driver.
const NAME: &'static str;
const NAME_LOWERCASE: &'static str;

/// The schemes for database URLs that should match this driver.
const URL_SCHEMES: &'static [&'static str];
Expand Down
61 changes: 47 additions & 14 deletions sqlx-core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ macro_rules! private_tracing_dynamic_event {
}};
}

#[doc(hidden)]
#[macro_export]
macro_rules! private_tracing_dynamic_span {
(target: $target:expr, $level:expr, $($args:tt)*) => {{
use ::tracing::Level;

match $level {
Level::ERROR => ::tracing::span!(target: $target, Level::ERROR, $($args)*),
Level::WARN => ::tracing::span!(target: $target, Level::WARN, $($args)*),
Level::INFO => ::tracing::span!(target: $target, Level::INFO, $($args)*),
Level::DEBUG => ::tracing::span!(target: $target, Level::DEBUG, $($args)*),
Level::TRACE => ::tracing::span!(target: $target, Level::TRACE, $($args)*),
}
}};
}


#[doc(hidden)]
pub fn private_level_filter_to_levels(
filter: log::LevelFilter,
Expand Down Expand Up @@ -68,16 +85,33 @@ pub struct QueryLogger<'q> {
rows_affected: u64,
start: Instant,
settings: LogSettings,
pub span: tracing::Span,
}

impl<'q> QueryLogger<'q> {
pub fn new(sql: &'q str, settings: LogSettings) -> Self {
pub fn new(sql: &'q str, db_system: &'q str, settings: LogSettings) -> Self {
use tracing::field::Empty;

let level = private_level_filter_to_trace_level(settings.tracing_span_level()).unwrap_or(tracing::Level::TRACE);

let span = private_tracing_dynamic_span!(
target: "sqlx::query",
level,
"sqlx::query",
summary = Empty,
db.statement = Empty,
db.system = db_system,
rows_affected = Empty,
rows_returned = Empty,
);

Self {
sql,
rows_returned: 0,
rows_affected: 0,
start: Instant::now(),
settings,
span,
}
}

Expand All @@ -100,6 +134,7 @@ impl<'q> QueryLogger<'q> {
self.settings.statements_level
};

// only create an event if the level filter is set
if let Some((tracing_level, log_level)) = private_level_filter_to_levels(lvl) {
// The enabled level could be set from either tracing world or log world, so check both
// to see if logging should be enabled for our level
Expand All @@ -122,37 +157,35 @@ impl<'q> QueryLogger<'q> {
String::new()
};

if was_slow {
self.span.record("summary", summary);
self.span.record("db.statement", sql);
Copy link
Contributor

@iamjpotts iamjpotts Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something else to keep in mind, and may need adjustment here, is that logging the query isn't free.

Its not uncommon for select statements to exceed 25KB or 50KB or more, and 100% logging of those will have a materially negative impact on both the running application outputting the logs, and the log observer attempting to capture them.

This is why the current implementation only logs the query when the query is slow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm taking that into account. The revised implementation will allow the span to be leveled individually, and the query will only be recorded if any of the events or spans are within the configured level.

self.span.record("rows_affected", self.rows_affected);
self.span.record("rows_returned", self.rows_returned);

let _e = self.span.enter();
if was_slow {
private_tracing_dynamic_event!(
target: "sqlx::query",
tracing_level,
summary,
db.statement = sql,
rows_affected = self.rows_affected,
rows_returned = self.rows_returned,
// When logging to JSON, one can trigger alerts from the presence of this field.
slow_threshold=?self.settings.slow_statements_duration,
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
?elapsed,
// Search friendly - numeric
elapsed_secs = elapsed.as_secs_f64(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing elapsed and elapsed_secs is a breaking change to the instrumentation and reporting that others have built on top of existing sqlx logging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good ole' Hyrum's Law https://www.hyrumslaw.com/

Strictly speaking, we never made the exact format of log messages part of our API contract, but I'd also be pretty annoyed if this changed out from under me.

It'd also make me a hypocrite, given how the query macros came about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was hoping this would be rolled into the breaking-changes-allowed 8.x release, as it's a little superfluous to track timings in addition to the tracing span's own timing, but I certainly see how it's useful. Restored and rebased.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

A standards based approach would certainly be better in the long run.

I have my own logging breaking change pr open - but its much smaller in scope and at @abonander's suggestion we tagged a few users who were interested to see if that break will affect them, as an alternative to maintaining two outputs.

The break that was in this PR (removing the elapses) would affect people.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was hoping this would be rolled into the breaking-changes-allowed 8.x release

Yeah, to be perfectly honest, this PR required more thinking than I wanted to do while I was trying to wrap up 0.8.0.

At a certain point, I have to stop myself from looking at new PRs, or else I'll sit there forever trying to get every one of them merged before I kick out a release.

If we're happy calling this a breaking change, we can plan to land this for 0.9.0.

Copy link
Author

@zvkemp zvkemp Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a certain point, I have to stop myself from looking at new PRs, or else I'll sit there forever trying to get every one of them merged before I kick out a release.

Completely understand; hope you didn't take my comment as a criticism. Currently re-working this to not be a breaking change; the tracing span should be opt-in if it lands in 0.8, and whether or not that would change in 0.9 is an open question.

// When logging to JSON, one can trigger alerts from the presence of this field.
slow_threshold=?self.settings.slow_statements_duration,
// Make sure to use "slow" in the message as that's likely
// what people will grep for.
"slow statement: execution time exceeded alert threshold"
);
)
} else {
private_tracing_dynamic_event!(
target: "sqlx::query",
tracing_level,
summary,
db.statement = sql,
rows_affected = self.rows_affected,
rows_returned = self.rows_returned,
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
?elapsed,
// Search friendly - numeric
elapsed_secs = elapsed.as_secs_f64(),
);
)
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
use crate::private_tracing_dynamic_span;
use crossbeam_queue::ArrayQueue;
use tracing::Instrument;

use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};

Expand Down Expand Up @@ -245,6 +247,10 @@ impl<DB: Database> PoolInner<DB> {
}
}

pub fn tracing_span_level(&self) -> Level {
std::cmp::max(self.acquire_slow_level, self.acquire_time_level).unwrap_or(Level::TRACE)
}

pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -253,6 +259,13 @@ impl<DB: Database> PoolInner<DB> {
let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;

let span = private_tracing_dynamic_span!(
target: "sqlx::pool::acquire",
self.tracing_span_level(),
"sqlx::pool::acquire",
db.system = DB::NAME_LOWERCASE,
);

let acquired = crate::rt::timeout(
self.options.acquire_timeout,
async {
Expand Down Expand Up @@ -295,6 +308,7 @@ impl<DB: Database> PoolInner<DB> {
}
}
)
.instrument(span.clone())
.await
.map_err(|_| Error::PoolTimedOut)??;

Expand All @@ -304,6 +318,8 @@ impl<DB: Database> PoolInner<DB> {
.acquire_slow_level
.filter(|_| acquired_after > self.options.acquire_slow_threshold);

let _e = span.enter();

if let Some(level) = acquire_slow_level {
private_tracing_dynamic_event!(
target: "sqlx::pool::acquire",
Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ smallvec = "1.7.0"
stringprep = "0.1.2"
thiserror = "1.0.35"
tracing = { version = "0.1.37", features = ["log"] }
tracing-futures.workspace = true
whoami = { version = "1.2.1", default-features = false }

serde = { version = "1.0.144", optional = true }
Expand Down
7 changes: 5 additions & 2 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use sqlx_core::database::Database;
use tracing_futures::Instrument;
use std::{borrow::Cow, sync::Arc};

impl MySqlConnection {
Expand Down Expand Up @@ -106,7 +108,8 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
let mut logger = QueryLogger::new(sql, MySql::NAME_LOWERCASE, self.inner.log_settings.clone());
let span_handle = logger.span.clone();

self.inner.stream.wait_until_ready().await?;
self.inner.stream.waiting.push_back(Waiting::Result);
Expand Down Expand Up @@ -240,7 +243,7 @@ impl MySqlConnection {
r#yield!(v);
}
}
}))
}).instrument(span_handle))
}
}

Expand Down
1 change: 1 addition & 0 deletions sqlx-mysql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Database for MySql {
type Statement<'q> = MySqlStatement<'q>;

const NAME: &'static str = "MySQL";
const NAME_LOWERCASE: &'static str = "mysql";

const URL_SCHEMES: &'static [&'static str] = &["mysql", "mariadb"];
}
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures-channel = { version = "0.3.19", default-features = false, features = ["s
futures-core = { version = "0.3.19", default-features = false }
futures-io = "0.3.24"
futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] }
tracing-futures.workspace = true

# Cryptographic Primitives
crc = "3.0.0"
Expand Down
7 changes: 5 additions & 2 deletions sqlx-postgres/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use futures_core::stream::BoxStream;
use futures_core::Stream;
use futures_util::{pin_mut, TryStreamExt};
use sqlx_core::arguments::Arguments;
use sqlx_core::database::Database;
use sqlx_core::Either;
use tracing_futures::Instrument;
use std::{borrow::Cow, sync::Arc};

async fn prepare(
Expand Down Expand Up @@ -195,7 +197,8 @@ impl PgConnection {
persistent: bool,
metadata_opt: Option<Arc<PgStatementMetadata>>,
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
let mut logger = QueryLogger::new(query, self.log_settings.clone());
let mut logger = QueryLogger::new(query, Postgres::NAME_LOWERCASE, self.log_settings.clone());
let span_handle = logger.span.clone();

// before we continue, wait until we are "ready" to accept more queries
self.wait_until_ready().await?;
Expand Down Expand Up @@ -362,7 +365,7 @@ impl PgConnection {
}

Ok(())
})
}.instrument(span_handle))
}
}

Expand Down
4 changes: 3 additions & 1 deletion sqlx-postgres/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures_channel::mpsc::UnboundedSender;
use futures_util::SinkExt;
use log::Level;
use sqlx_core::bytes::Buf;
use sqlx_core::database::Database;

use crate::connection::tls::MaybeUpgradeTls;
use crate::error::Error;
Expand All @@ -14,7 +15,7 @@ use crate::message::{
ParameterStatus, ReceivedMessage,
};
use crate::net::{self, BufferedSocket, Socket};
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity, Postgres};

// the stream is a separate type from the connection to uphold the invariant where an instantiated
// [PgConnection] is a **valid** connection to postgres
Expand Down Expand Up @@ -185,6 +186,7 @@ impl PgStream {
sqlx_core::private_tracing_dynamic_event!(
target: "sqlx::postgres::notice",
tracing_level,
db.system = Postgres::NAME_LOWERCASE,
message = notice.message()
);
}
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Database for Postgres {
type Statement<'q> = PgStatement<'q>;

const NAME: &'static str = "PostgreSQL";
const NAME_LOWERCASE: &'static str = "postgresql";

const URL_SCHEMES: &'static [&'static str] = &["postgres", "postgresql"];
}
Expand Down
11 changes: 9 additions & 2 deletions sqlx-sqlite/src/connection/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::connection::{ConnectionHandle, ConnectionState};
use crate::error::Error;
use crate::logger::QueryLogger;
use crate::statement::{StatementHandle, VirtualStatement};
use crate::{SqliteArguments, SqliteQueryResult, SqliteRow};
use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow};
use sqlx_core::database::Database;
use sqlx_core::Either;
use tracing::Span;

pub struct ExecuteIter<'a> {
handle: &'a mut ConnectionHandle,
Expand All @@ -16,6 +18,7 @@ pub struct ExecuteIter<'a> {
args_used: usize,

goto_next: bool,
span: Span,
}

pub(crate) fn iter<'a>(
Expand All @@ -27,7 +30,8 @@ pub(crate) fn iter<'a>(
// fetch the cached statement or allocate a new one
let statement = conn.statements.get(query, persistent)?;

let logger = QueryLogger::new(query, conn.log_settings.clone());
let logger = QueryLogger::new(query, Sqlite::NAME_LOWERCASE, conn.log_settings.clone());
let span = logger.span.clone();

Ok(ExecuteIter {
handle: &mut conn.handle,
Expand All @@ -36,6 +40,7 @@ pub(crate) fn iter<'a>(
args,
args_used: 0,
goto_next: true,
span
})
}

Expand Down Expand Up @@ -67,6 +72,8 @@ impl Iterator for ExecuteIter<'_> {
type Item = Result<Either<SqliteQueryResult, SqliteRow>, Error>;

fn next(&mut self) -> Option<Self::Item> {
let _e = self.span.enter();

let statement = if self.goto_next {
let statement = match self.statement.prepare_next(self.handle) {
Ok(Some(statement)) => statement,
Expand Down
Loading