diff --git a/Cargo.lock b/Cargo.lock index c527c0e..dea75e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,47 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be6ea09c9b96cb5076af0de2e383bd2bc0c18f827cf1967bdd353e0b910d733" +dependencies = [ + "axum 0.7.4", + "axum-core 0.4.3", + "bytes", + "futures-util", + "headers", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-tracing-opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6672eee77fec7036fe83cf2111e25c4f9ef06b35b50d656d8d8127c2347e3b4a" +dependencies = [ + "axum 0.7.4", + "futures-core", + "futures-util", + "http 1.1.0", + "opentelemetry", + "pin-project-lite", + "tower", + "tracing", + "tracing-opentelemetry", + "tracing-opentelemetry-instrumentation-sdk", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -736,6 +777,8 @@ dependencies = [ "async-graphql", "async-graphql-axum", "axum 0.7.4", + "axum-extra", + "axum-tracing-opentelemetry", "built", "chrono", "clap", @@ -1111,6 +1154,30 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.4.1" @@ -3257,6 +3324,18 @@ dependencies = [ "web-time", ] +[[package]] +name = "tracing-opentelemetry-instrumentation-sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36785b61526c60c97c5aaa3e588d07099e1fb7b51dc96ff1508d4abe80389b36" +dependencies = [ + "http 1.1.0", + "opentelemetry", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" diff --git a/charts/datasets/charts/datasets/Chart.yaml b/charts/datasets/charts/datasets/Chart.yaml index 6ffa353..8e575aa 100644 --- a/charts/datasets/charts/datasets/Chart.yaml +++ b/charts/datasets/charts/datasets/Chart.yaml @@ -5,4 +5,4 @@ type: application version: 0.1.0 -appVersion: 0.1.0-rc1 +appVersion: 0.1.0-rc2 diff --git a/datasets/Cargo.toml b/datasets/Cargo.toml index 98213c1..f0a8473 100644 --- a/datasets/Cargo.toml +++ b/datasets/Cargo.toml @@ -14,6 +14,8 @@ async-graphql = { version = "7.0.2", default-features = false, features = [ ] } async-graphql-axum = { version = "7.0.2" } axum = { version = "0.7.4", features = ["ws"] } +axum-extra = { version = "0.9.3", features = ["typed-header"] } +axum-tracing-opentelemetry = { version = "0.18.0" } chrono = { version = "0.4.35" } clap = { version = "4.5.2", features = ["derive", "env"] } dotenvy = { version = "0.15.7" } diff --git a/datasets/src/main.rs b/datasets/src/main.rs index c50eb9b..0be6d2c 100644 --- a/datasets/src/main.rs +++ b/datasets/src/main.rs @@ -7,12 +7,14 @@ mod built_info; /// GraphQL resolvers mod graphql; +mod route_handlers; -use async_graphql::{extensions::Tracing, http::GraphiQLSource, SDLExportOptions}; -use async_graphql_axum::{GraphQL, GraphQLSubscription}; +use async_graphql::{http::GraphiQLSource, SDLExportOptions}; use axum::{response::Html, routing::get, Router}; +use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; use clap::Parser; use graphql::{root_schema_builder, RootSchema}; +use route_handlers::GraphQLHandler; use opentelemetry_otlp::WithExportConfig; use sea_orm::{ConnectOptions, Database, DatabaseConnection, DbErr, TransactionError}; use std::{ @@ -23,7 +25,7 @@ use std::{ time::Duration, }; use tokio::net::TcpListener; -use tracing::instrument; +use tracing::{info, instrument}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; @@ -65,8 +67,12 @@ struct SchemaArgs { /// Creates a connection pool to access the database #[instrument(skip(database_url))] async fn setup_database(database_url: Url) -> Result> { - let connection_options = ConnectOptions::new(database_url.to_string()); + info!("Connecting to database at {database_url}"); + let connection_options = ConnectOptions::new(database_url.to_string()) + .sqlx_logging_level(tracing::log::LevelFilter::Debug) + .to_owned(); let connection = Database::connect(connection_options).await?; + info!("Database connection established: {connection:?}"); Ok(connection) } @@ -74,28 +80,24 @@ async fn setup_database(database_url: Url) -> Result Router { #[allow(clippy::missing_docs_in_private_items)] const GRAPHQL_ENDPOINT: &str = "/"; - #[allow(clippy::missing_docs_in_private_items)] - const SUBSCRIPTION_ENDPOINT: &str = "/ws"; Router::new() .route( GRAPHQL_ENDPOINT, get(Html( - GraphiQLSource::build() - .endpoint(GRAPHQL_ENDPOINT) - .subscription_endpoint(SUBSCRIPTION_ENDPOINT) - .finish(), + GraphiQLSource::build().endpoint(GRAPHQL_ENDPOINT).finish(), )) - .post_service(GraphQL::new(schema.clone())), + .post(GraphQLHandler::new(schema)), ) - .route_service(SUBSCRIPTION_ENDPOINT, GraphQLSubscription::new(schema)) + .layer(OtelInResponseLayer) + .layer(OtelAxumLayer::default()) } /// Serves the endpoints on the specified port forever async fn serve(router: Router, port: u16) -> Result<(), std::io::Error> { let socket_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)); let listener = TcpListener::bind(socket_addr).await?; - println!("GraphiQL IDE: {}", socket_addr); + println!("Serving API & GraphQL UI at {}", socket_addr); axum::serve(listener, router.into_make_service()).await?; Ok(()) } @@ -118,6 +120,9 @@ fn setup_telemetry( ), ]); let (metrics_layer, tracing_layer) = if let Some(otel_collector_url) = otel_collector_url { + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::default(), + ); ( Some(tracing_opentelemetry::MetricsLayer::new( opentelemetry_otlp::new_pipeline() @@ -171,7 +176,6 @@ async fn main() { setup_telemetry(args.log_level, args.otel_collector_url).unwrap(); let database = setup_database(args.database_url).await.unwrap(); let schema = root_schema_builder() - .extension(Tracing) .data(database) .finish(); let router = setup_router(schema); diff --git a/datasets/src/route_handlers.rs b/datasets/src/route_handlers.rs new file mode 100644 index 0000000..135be2d --- /dev/null +++ b/datasets/src/route_handlers.rs @@ -0,0 +1,55 @@ +use async_graphql::Executor; +use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; +use axum::{ + extract::Request, + handler::Handler, + http::StatusCode, + response::{IntoResponse, Response}, + RequestExt, +}; +use axum_extra::{ + headers::{authorization::Bearer, Authorization}, + TypedHeader, +}; +use std::{future::Future, pin::Pin}; + +/// An [`Handler`] which executes an [`Executor`] including the [`Authorization`] in the [`async_graphql::Context`] +#[derive(Debug, Clone)] +pub struct GraphQLHandler { + /// The GraphQL executor used to process the request + executor: E, +} + +impl GraphQLHandler { + /// Constructs an instance of the handler with the provided schema. + pub fn new(executor: E) -> Self { + Self { executor } + } +} + +impl Handler<((),), S> for GraphQLHandler +where + E: Executor, +{ + type Future = Pin + Send + 'static>>; + + fn call(self, mut req: Request, _state: S) -> Self::Future { + Box::pin(async move { + let token = req + .extract_parts::>>() + .await + .ok() + .map(|token| token.0); + let request = req.extract::().await; + match request { + Ok(request) => GraphQLResponse::from( + self.executor + .execute(request.into_inner().data(token)) + .await, + ) + .into_response(), + Err(err) => (StatusCode::BAD_REQUEST, err.0.to_string()).into_response(), + } + }) + } +}