Skip to content

Commit

Permalink
Tracing propogation
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 2, 2024
1 parent 587638f commit f7697ed
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 15 deletions.
79 changes: 79 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/datasets/charts/datasets/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ type: application

version: 0.1.0

appVersion: 0.1.0-rc1
appVersion: 0.1.0-rc2
2 changes: 2 additions & 0 deletions datasets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ async-graphql = { version = "7.0.2", default-features = false, features = [
] }
async-graphql-axum = { version = "7.0.2" }
axum = { version = "0.7.4", features = ["ws"] }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum-tracing-opentelemetry = { version = "0.18.0" }
chrono = { version = "0.4.35" }
clap = { version = "4.5.2", features = ["derive", "env"] }
dotenvy = { version = "0.15.7" }
Expand Down
32 changes: 18 additions & 14 deletions datasets/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
mod built_info;
/// GraphQL resolvers
mod graphql;
mod route_handlers;

use async_graphql::{extensions::Tracing, http::GraphiQLSource, SDLExportOptions};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
use async_graphql::{http::GraphiQLSource, SDLExportOptions};
use axum::{response::Html, routing::get, Router};
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
use clap::Parser;
use graphql::{root_schema_builder, RootSchema};
use route_handlers::GraphQLHandler;
use opentelemetry_otlp::WithExportConfig;
use sea_orm::{ConnectOptions, Database, DatabaseConnection, DbErr, TransactionError};
use std::{
Expand All @@ -23,7 +25,7 @@ use std::{
time::Duration,
};
use tokio::net::TcpListener;
use tracing::instrument;
use tracing::{info, instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

Expand Down Expand Up @@ -65,37 +67,37 @@ struct SchemaArgs {
/// Creates a connection pool to access the database
#[instrument(skip(database_url))]
async fn setup_database(database_url: Url) -> Result<DatabaseConnection, TransactionError<DbErr>> {
let connection_options = ConnectOptions::new(database_url.to_string());
info!("Connecting to database at {database_url}");
let connection_options = ConnectOptions::new(database_url.to_string())
.sqlx_logging_level(tracing::log::LevelFilter::Debug)
.to_owned();
let connection = Database::connect(connection_options).await?;
info!("Database connection established: {connection:?}");
Ok(connection)
}

/// Creates an [`axum::Router`] serving GraphiQL, synchronous GraphQL and GraphQL subscriptions
fn setup_router(schema: RootSchema) -> Router {
#[allow(clippy::missing_docs_in_private_items)]
const GRAPHQL_ENDPOINT: &str = "/";
#[allow(clippy::missing_docs_in_private_items)]
const SUBSCRIPTION_ENDPOINT: &str = "/ws";

Router::new()
.route(
GRAPHQL_ENDPOINT,
get(Html(
GraphiQLSource::build()
.endpoint(GRAPHQL_ENDPOINT)
.subscription_endpoint(SUBSCRIPTION_ENDPOINT)
.finish(),
GraphiQLSource::build().endpoint(GRAPHQL_ENDPOINT).finish(),
))
.post_service(GraphQL::new(schema.clone())),
.post(GraphQLHandler::new(schema)),
)
.route_service(SUBSCRIPTION_ENDPOINT, GraphQLSubscription::new(schema))
.layer(OtelInResponseLayer)
.layer(OtelAxumLayer::default())
}

/// Serves the endpoints on the specified port forever
async fn serve(router: Router, port: u16) -> Result<(), std::io::Error> {
let socket_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
let listener = TcpListener::bind(socket_addr).await?;
println!("GraphiQL IDE: {}", socket_addr);
println!("Serving API & GraphQL UI at {}", socket_addr);
axum::serve(listener, router.into_make_service()).await?;
Ok(())
}
Expand All @@ -118,6 +120,9 @@ fn setup_telemetry(
),
]);
let (metrics_layer, tracing_layer) = if let Some(otel_collector_url) = otel_collector_url {
opentelemetry::global::set_text_map_propagator(
opentelemetry_sdk::propagation::TraceContextPropagator::default(),
);
(
Some(tracing_opentelemetry::MetricsLayer::new(
opentelemetry_otlp::new_pipeline()
Expand Down Expand Up @@ -171,7 +176,6 @@ async fn main() {
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();
let database = setup_database(args.database_url).await.unwrap();
let schema = root_schema_builder()
.extension(Tracing)
.data(database)
.finish();
let router = setup_router(schema);
Expand Down
55 changes: 55 additions & 0 deletions datasets/src/route_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_graphql::Executor;
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{
extract::Request,
handler::Handler,
http::StatusCode,
response::{IntoResponse, Response},
RequestExt,
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
};
use std::{future::Future, pin::Pin};

/// An [`Handler`] which executes an [`Executor`] including the [`Authorization<Bearer>`] in the [`async_graphql::Context`]
#[derive(Debug, Clone)]
pub struct GraphQLHandler<E: Executor> {
/// The GraphQL executor used to process the request
executor: E,
}

impl<E: Executor> GraphQLHandler<E> {
/// Constructs an instance of the handler with the provided schema.
pub fn new(executor: E) -> Self {
Self { executor }
}
}

impl<S, E> Handler<((),), S> for GraphQLHandler<E>
where
E: Executor,
{
type Future = Pin<Box<dyn Future<Output = Response> + Send + 'static>>;

fn call(self, mut req: Request, _state: S) -> Self::Future {
Box::pin(async move {
let token = req
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await
.ok()
.map(|token| token.0);
let request = req.extract::<GraphQLRequest, _>().await;
match request {
Ok(request) => GraphQLResponse::from(
self.executor
.execute(request.into_inner().data(token))
.await,
)
.into_response(),
Err(err) => (StatusCode::BAD_REQUEST, err.0.to_string()).into_response(),
}
})
}
}

0 comments on commit f7697ed

Please sign in to comment.