From ccb9ff0269fe41823fb4182217ebfd907411cb3c Mon Sep 17 00:00:00 2001 From: Yassin Eldeeb Date: Wed, 14 Feb 2024 13:57:19 +0200 Subject: [PATCH] fix: run `on_upstream_http_request` hook in federation source (#440) Co-authored-by: Dotan Simha --- Cargo.lock | 1 + libs/common/src/lib.rs | 1 + libs/common/src/plugin_manager.rs | 28 ++ libs/engine/src/gateway.rs | 18 +- libs/engine/src/plugin_manager.rs | 26 +- libs/engine/src/source/federation_source.rs | 28 +- libs/federation_query_planner/Cargo.toml | 1 + libs/federation_query_planner/src/executor.rs | 197 +----------- libs/federation_query_planner/src/lib.rs | 287 ++++++++++++++++-- libs/tracing/src/minitrace_mgr.rs | 1 + 10 files changed, 339 insertions(+), 249 deletions(-) create mode 100644 libs/common/src/plugin_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 41726c32..0cd81595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,6 +1915,7 @@ dependencies = [ "anyhow", "async-graphql", "async-trait", + "conductor_common", "conductor_tracing", "criterion", "futures", diff --git a/libs/common/src/lib.rs b/libs/common/src/lib.rs index 3b419ce2..f13926a9 100644 --- a/libs/common/src/lib.rs +++ b/libs/common/src/lib.rs @@ -3,6 +3,7 @@ pub mod graphql; pub mod http; pub mod json; pub mod plugin; +pub mod plugin_manager; pub mod serde_utils; pub mod vrl_functions; pub mod vrl_utils; diff --git a/libs/common/src/plugin_manager.rs b/libs/common/src/plugin_manager.rs new file mode 100644 index 00000000..8a28b313 --- /dev/null +++ b/libs/common/src/plugin_manager.rs @@ -0,0 +1,28 @@ +use crate::{ + execute::RequestExecutionContext, + graphql::GraphQLRequest, + http::{ConductorHttpRequest, ConductorHttpResponse}, +}; +use reqwest::Response; + +#[async_trait::async_trait(?Send)] +pub trait PluginManager: std::fmt::Debug + Send + Sync { + async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext); + fn on_downstream_http_response( + &self, + context: &mut RequestExecutionContext, + response: &mut ConductorHttpResponse, + ); + async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext); + async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest); + async fn on_upstream_http_request<'a>( + &self, + ctx: &mut RequestExecutionContext, + request: &mut ConductorHttpRequest, + ); + async fn on_upstream_http_response<'a>( + &self, + ctx: &mut RequestExecutionContext, + response: &Result, + ); +} diff --git a/libs/engine/src/gateway.rs b/libs/engine/src/gateway.rs index a1fc87c3..3de2956c 100644 --- a/libs/engine/src/gateway.rs +++ b/libs/engine/src/gateway.rs @@ -5,6 +5,7 @@ use conductor_common::{ graphql::{ExtractGraphQLOperationError, GraphQLRequest, GraphQLResponse, ParsedGraphQLRequest}, http::{ConductorHttpRequest, ConductorHttpResponse, Url}, plugin::PluginError, + plugin_manager::PluginManager, }; use conductor_config::{ConductorConfig, EndpointDefinition, SourceDefinition}; use conductor_tracing::{ @@ -17,7 +18,7 @@ use reqwest::{Method, StatusCode}; use tracing::error; use crate::{ - plugin_manager::PluginManager, + plugin_manager::PluginManagerImpl, source::{ federation_source::FederationSourceRuntime, graphql_source::GraphQLSourceRuntime, @@ -30,7 +31,7 @@ use crate::{ pub struct ConductorGatewayRouteData { pub endpoint: String, pub tenant_id: u32, - pub plugin_manager: Arc, + pub plugin_manager: Arc>, pub to: Arc>, } @@ -102,9 +103,10 @@ impl ConductorGateway { .cloned() .collect::>(); - let plugin_manager = PluginManager::new(&Some(combined_plugins), tracing_manager, tenant_id) - .await - .map_err(GatewayError::PluginManagerInitError)?; + let plugin_manager = + PluginManagerImpl::new(&Some(combined_plugins), tracing_manager, tenant_id) + .await + .map_err(GatewayError::PluginManagerInitError)?; let upstream_source: Box = config_object .sources @@ -115,7 +117,7 @@ impl ConductorGateway { let route_data = ConductorGatewayRouteData { endpoint: endpoint_config.path.clone(), to: Arc::new(upstream_source), - plugin_manager: Arc::new(plugin_manager), + plugin_manager: Arc::new(Box::new(plugin_manager)), tenant_id, }; @@ -159,10 +161,10 @@ impl ConductorGateway { plugins: Vec>, request: ConductorHttpRequest, ) -> ConductorHttpResponse { - let plugin_manager = PluginManager::new_from_vec(plugins); + let plugin_manager = PluginManagerImpl::new_from_vec(plugins); let route_data = ConductorGatewayRouteData { endpoint: "/".to_string(), - plugin_manager: Arc::new(plugin_manager), + plugin_manager: Arc::new(Box::new(plugin_manager)), to: source, tenant_id: 0, }; diff --git a/libs/engine/src/plugin_manager.rs b/libs/engine/src/plugin_manager.rs index 974dff79..5b107492 100644 --- a/libs/engine/src/plugin_manager.rs +++ b/libs/engine/src/plugin_manager.rs @@ -3,22 +3,23 @@ use conductor_common::{ graphql::GraphQLRequest, http::{ConductorHttpRequest, ConductorHttpResponse}, plugin::{CreatablePlugin, Plugin, PluginError}, + plugin_manager::PluginManager, }; use conductor_config::PluginDefinition; use conductor_tracing::minitrace_mgr::MinitraceManager; use reqwest::Response; #[derive(Debug, Default)] -pub struct PluginManager { +pub struct PluginManagerImpl { plugins: Vec>, } -impl PluginManager { +impl PluginManagerImpl { pub fn new_from_vec(plugins: Vec>) -> Self { let mut pm = Self { plugins }; // We want to make sure to register default plugins last, in order to ensure it's setting the value correctly - for p in PluginManager::default_plugins() { + for p in PluginManagerImpl::default_plugins() { pm.register_boxed_plugin(p); } @@ -34,7 +35,7 @@ impl PluginManager { tracing_manager: &mut MinitraceManager, tenant_id: u32, ) -> Result { - let mut instance = PluginManager::default(); + let mut instance = PluginManagerImpl::default(); if let Some(config_defs) = plugins_config { for plugin_def in config_defs.iter() { @@ -98,7 +99,7 @@ impl PluginManager { }; // We want to make sure to register these last, in order to ensure it's setting the value correctly - for p in PluginManager::default_plugins() { + for p in PluginManagerImpl::default_plugins() { instance.register_boxed_plugin(p); } @@ -116,14 +117,17 @@ impl PluginManager { pub fn register_plugin(&mut self, plugin: impl Plugin + 'static) { self.plugins.push(Box::new(plugin)); } +} +#[async_trait::async_trait(?Send)] +impl PluginManager for PluginManagerImpl { #[tracing::instrument( level = "debug", skip(self, context), name = "on_downstream_http_request" )] #[inline] - pub async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext) { + async fn on_downstream_http_request(&self, context: &mut RequestExecutionContext) { let p = &self.plugins; for plugin in p.iter() { @@ -141,7 +145,7 @@ impl PluginManager { name = "on_downstream_http_response" )] #[inline] - pub fn on_downstream_http_response( + fn on_downstream_http_response( &self, context: &mut RequestExecutionContext, response: &mut ConductorHttpResponse, @@ -163,7 +167,7 @@ impl PluginManager { name = "on_downstream_graphql_request" )] #[inline] - pub async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext) { + async fn on_downstream_graphql_request(&self, context: &mut RequestExecutionContext) { let p = &self.plugins; for plugin in p.iter() { @@ -177,7 +181,7 @@ impl PluginManager { #[tracing::instrument(level = "debug", skip(self, req), name = "on_upstream_graphql_request")] #[inline] - pub async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest) { + async fn on_upstream_graphql_request<'a>(&self, req: &mut GraphQLRequest) { let p = &self.plugins; for plugin in p.iter() { @@ -191,7 +195,7 @@ impl PluginManager { name = "on_upstream_http_request" )] #[inline] - pub async fn on_upstream_http_request<'a>( + async fn on_upstream_http_request<'a>( &self, ctx: &mut RequestExecutionContext, request: &mut ConductorHttpRequest, @@ -213,7 +217,7 @@ impl PluginManager { name = "on_upstream_http_response" )] #[inline] - pub async fn on_upstream_http_response<'a>( + async fn on_upstream_http_response<'a>( &self, ctx: &mut RequestExecutionContext, response: &Result, diff --git a/libs/engine/src/source/federation_source.rs b/libs/engine/src/source/federation_source.rs index d5b54ea1..93256666 100644 --- a/libs/engine/src/source/federation_source.rs +++ b/libs/engine/src/source/federation_source.rs @@ -4,10 +4,12 @@ use base64::{engine, Engine}; use conductor_common::execute::RequestExecutionContext; use conductor_common::graphql::GraphQLResponse; use conductor_config::{FederationSourceConfig, SupergraphSourceConfig}; -use federation_query_planner::execute_federation; use federation_query_planner::supergraph::{parse_supergraph, Supergraph}; +use federation_query_planner::FederationExecutor; +use futures::lock::Mutex; use minitrace_reqwest::{traced_reqwest, TracedHttpClient}; use std::collections::HashMap; +use std::sync::Arc; use std::{future::Future, pin::Pin}; #[derive(Debug)] @@ -156,7 +158,7 @@ impl FederationSourceRuntime { } pub async fn update_supergraph(&mut self, new_schema: String) { - let new_supergraph = parse_supergraph(&new_schema).unwrap(); + let new_supergraph: Supergraph = parse_supergraph(&new_schema).unwrap(); self.supergraph = new_supergraph; } @@ -190,7 +192,7 @@ impl SourceRuntime for FederationSourceRuntime { fn execute<'a>( &'a self, - _route_data: &'a ConductorGatewayRouteData, + route_data: &'a ConductorGatewayRouteData, request_context: &'a mut RequestExecutionContext, ) -> Pin> + 'a)>> { Box::pin(wasm_polyfills::call_async(async move { @@ -199,17 +201,17 @@ impl SourceRuntime for FederationSourceRuntime { .take() .expect("GraphQL request isn't available at the time of execution"); - // let source_req = &mut downstream_request.request; - - // TODO: this needs to be called by conductor execution when fetching subgarphs - // route_data - // .plugin_manager - // .on_upstream_graphql_request(source_req) - // .await; - let operation = downstream_request.parsed_operation; - - match execute_federation(&self.client, &self.supergraph, operation).await { + let executor = FederationExecutor { + client: &self.client, + plugin_manager: route_data.plugin_manager.clone(), + supergraph: &self.supergraph, + }; + + match executor + .execute_federation(Arc::new(Mutex::new(request_context)), operation) + .await + { Ok((response_data, query_plan)) => { let mut response = serde_json::from_str::(&response_data).unwrap(); diff --git a/libs/federation_query_planner/Cargo.toml b/libs/federation_query_planner/Cargo.toml index e6587447..a98b960e 100644 --- a/libs/federation_query_planner/Cargo.toml +++ b/libs/federation_query_planner/Cargo.toml @@ -10,6 +10,7 @@ bench = false serde = { workspace = true } wasm_polyfills = { path = "../wasm_polyfills" } conductor_tracing = { path = "../tracing" } +conductor_common = { path = "../common" } serde_json = { workspace = true } async-trait = { workspace = true } anyhow = { workspace = true } diff --git a/libs/federation_query_planner/src/executor.rs b/libs/federation_query_planner/src/executor.rs index 83ead620..8bae66b1 100644 --- a/libs/federation_query_planner/src/executor.rs +++ b/libs/federation_query_planner/src/executor.rs @@ -1,113 +1,12 @@ use std::pin::Pin; -use crate::constants::CONDUCTOR_INTERNAL_SERVICE_RESOLVER; -use crate::query_planner::Parallel; - -use super::query_planner::{QueryPlan, QueryStep}; use super::supergraph::Supergraph; -use anyhow::{anyhow, Result}; use async_graphql::{dynamic::*, Error, Value}; -use futures::future::join_all; use futures::Future; -use minitrace::Span; use serde::{Deserialize, Serialize}; use serde_json::Value as SerdeValue; -pub async fn execute_query_plan( - client: &minitrace_reqwest::TracedHttpClient, - query_plan: &QueryPlan, - supergraph: &Supergraph, -) -> Result>> { - let mut all_futures = Vec::new(); - - for step in &query_plan.parallel_steps { - match step { - Parallel::Sequential(query_steps) => { - let future = execute_sequential(client, query_steps, supergraph); - all_futures.push(future); - } - } - } - - let results: Result, _> = join_all(all_futures).await.into_iter().collect(); - - match results { - Ok(val) => Ok(val), - Err(e) => Err(anyhow!(e)), - } -} - -async fn execute_sequential( - client: &minitrace_reqwest::TracedHttpClient, - query_steps: &Vec, - supergraph: &Supergraph, -) -> Result> { - let mut data_vec = vec![]; - let mut entity_arguments: Option = None; - - for (i, query_step) in query_steps.iter().enumerate() { - let data = execute_query_step(client, query_step, supergraph, entity_arguments.clone()).await; - - match data { - Ok(data) => { - data_vec.push(( - (query_step.service_name.clone(), query_step.query.clone()), - data, - )); - - if i + 1 < query_steps.len() { - let next_step = &query_steps[i + 1]; - match &next_step.entity_query_needs { - Some(needs) => { - data_vec.iter().find(|&data| { - if let Some(x) = data.1.data.as_ref() { - // recursively search and find match - let y = find_objects_matching_criteria( - x, - &needs.__typename, - &needs.fields.clone().into_iter().next().unwrap(), - ); - - if y.is_empty() { - return false; - } else { - entity_arguments = Some(SerdeValue::from(y)); - return true; - } - } - - false - }); - - Some(serde_json::json!({ "representations": entity_arguments })) - } - None => None, - } - } else { - None - }; - } - Err(err) => return Err(err), - } - } - - let x: Vec<((String, String), QueryResponse)> = data_vec - .into_iter() - .map(|(plan_meta, response)| { - let new_response = QueryResponse { - data: response.data, - // Initialize other fields of QueryResponse as needed - errors: response.errors, - extensions: None, - }; - (plan_meta, new_response) - }) - .collect::>(); - - Ok(x) -} - -fn find_objects_matching_criteria( +pub fn find_objects_matching_criteria( json: &SerdeValue, typename: &str, field: &str, @@ -152,7 +51,7 @@ pub struct QueryResponse { pub extensions: Option, } -fn dynamically_build_schema_from_supergraph(supergraph: &Supergraph) -> Schema { +pub fn dynamically_build_schema_from_supergraph(supergraph: &Supergraph) -> Schema { let mut query = Object::new("Query"); // Dynamically create object types and fields @@ -189,95 +88,3 @@ fn dynamically_build_schema_from_supergraph(supergraph: &Supergraph) -> Schema { .finish() .expect("Introspection schema build failed") } - -async fn execute_query_step( - client: &minitrace_reqwest::TracedHttpClient, - query_step: &QueryStep, - supergraph: &Supergraph, - entity_arguments: Option, -) -> Result { - let is_introspection = query_step.service_name == CONDUCTOR_INTERNAL_SERVICE_RESOLVER; - - if is_introspection { - let schema = dynamically_build_schema_from_supergraph(supergraph); - - // Execute the introspection query - // TODO: whenever excuting a query step, we need to take the query out of the step's struct instead of copying it - let request = async_graphql::Request::new(query_step.query.to_string()); - let response = schema.execute(request).await; - - let data = serde_json::to_value(response.data)?; - let errors = response - .errors - .iter() - .map(|e| serde_json::to_value(e).unwrap()) - .collect(); - - Ok(QueryResponse { - data: Some(data), - errors: Some(errors), - extensions: None, - }) - } else { - let _span = Span::enter_with_local_parent(format!("subgraph {}", query_step.service_name)) - .with_properties(|| { - [ - ("service_name", query_step.service_name.clone()), - ("graphql.document", query_step.query.clone()), - ] - }); - let url = supergraph.subgraphs.get(&query_step.service_name).unwrap(); - - let variables_object = if let Some(arguments) = &entity_arguments { - serde_json::json!({ "representations": arguments }) - } else { - SerdeValue::Object(serde_json::Map::new()) - }; - - // TODO: improve this by implementing https://github.com/the-guild-org/conductor-t2/issues/205 - let response = match client - .post(url) - .header("Content-Type", "application/json") - .body( - serde_json::json!({ - "query": query_step.query, - "variables": variables_object - }) - .to_string(), - ) - .send() - .await - { - Ok(resp) => resp, - Err(err) => { - eprintln!("Failed to send request: {}", err); - return Err(anyhow::anyhow!("Failed to send request: {}", err)); - } - }; - - if !response.status().is_success() { - eprintln!("Received error response: {:?}", response.status()); - return Err(anyhow::anyhow!( - "Failed request with status: {}", - response.status() - )); - } - - let response_data = match response.json::().await { - Ok(data) => data, - Err(err) => { - eprintln!("Failed to parse response: {}", err); - return Err(anyhow::anyhow!("Failed to parse response: {}", err)); - } - }; - - // Check if there were any GraphQL errors - if let Some(errors) = &response_data.errors { - for error in errors { - eprintln!("Error: {:?}", error); - } - } - - Ok(response_data) - } -} diff --git a/libs/federation_query_planner/src/lib.rs b/libs/federation_query_planner/src/lib.rs index 2098c3c8..0511c9f1 100644 --- a/libs/federation_query_planner/src/lib.rs +++ b/libs/federation_query_planner/src/lib.rs @@ -1,14 +1,25 @@ -use std::ops::Index; - -use anyhow::{Ok, Result}; +use std::{ops::Index, sync::Arc}; + +use anyhow::{anyhow, Error, Ok as anyhowOk}; +use conductor_common::http::ConductorHttpRequest; +use conductor_common::{execute::RequestExecutionContext, plugin_manager::PluginManager}; +use constants::CONDUCTOR_INTERNAL_SERVICE_RESOLVER; +use executor::{ + dynamically_build_schema_from_supergraph, find_objects_matching_criteria, QueryResponse, +}; +use futures::future::join_all; +use futures::lock::Mutex; use graphql_parser::query::Document; -use query_planner::QueryPlan; +use minitrace::Span; +use query_planner::QueryStep; +use query_planner::{Parallel, QueryPlan}; +use reqwest::header::{HeaderValue, CONTENT_TYPE}; +use reqwest::Method; use serde_json::json; +use serde_json::Value as SerdeValue; use supergraph::Supergraph; -use crate::{ - executor::execute_query_plan, query_planner::plan_for_user_query, user_query::parse_user_query, -}; +use crate::{query_planner::plan_for_user_query, user_query::parse_user_query}; pub mod constants; pub mod executor; @@ -18,25 +29,257 @@ pub mod supergraph; pub mod type_merge; pub mod user_query; -pub async fn execute_federation( - client: &minitrace_reqwest::TracedHttpClient, - supergraph: &Supergraph, - parsed_user_query: Document<'static, String>, -) -> Result<(String, QueryPlan)> { - // println!("parsed_user_query: {:#?}", user_query); - let mut user_query = parse_user_query(parsed_user_query)?; - let query_plan = plan_for_user_query(supergraph, &mut user_query)?; +pub struct FederationExecutor<'a> { + pub client: &'a minitrace_reqwest::TracedHttpClient, + pub plugin_manager: Arc>, + pub supergraph: &'a Supergraph, +} + +impl<'a> FederationExecutor<'a> { + pub async fn execute_federation( + &self, + request_context: Arc>, + parsed_user_query: Document<'static, String>, + ) -> Result<(String, QueryPlan), Error> { + // println!("parsed_user_query: {:#?}", user_query); + let mut user_query = parse_user_query(parsed_user_query)?; + let query_plan = plan_for_user_query(self.supergraph, &mut user_query)?; + + // println!("query plan: {:#?}", query_plan); + + let response_vec = self + .execute_query_plan(&query_plan, request_context) + .await?; + + // println!("response: {:#?}", json!(response_vec).to_string()); + + anyhowOk(( + json!(response_vec.index(0).index(0).1).to_string(), + query_plan, + )) + } + + pub async fn execute_query_plan( + &self, + query_plan: &QueryPlan, + request_context: Arc>, + ) -> Result>, Error> { + let mut all_futures = Vec::new(); + + let parallel_block = query_plan.parallel_steps.first().unwrap(); + + match parallel_block { + Parallel::Sequential(steps) => { + let future = self.execute_sequential(steps, request_context); + all_futures.push(future); + + let results: Result, _> = join_all(all_futures).await.into_iter().collect(); + + match results { + Ok(val) => anyhowOk(val), + Err(e) => Err(anyhow!(e)), + } + } + } + } - // println!("query plan: {:#?}", query_plan); + pub async fn execute_sequential( + &self, + query_steps: &Vec, + request_context: Arc>, + ) -> Result, Error> { + let mut data_vec = vec![]; + let mut entity_arguments: Option = None; + + for (i, query_step) in query_steps.iter().enumerate() { + let data = self + .execute_query_step( + query_step, + entity_arguments.clone(), + request_context.lock().await, + ) + .await; + + match data { + Ok(data) => { + data_vec.push(( + (query_step.service_name.clone(), query_step.query.clone()), + data, + )); + + if i + 1 < query_steps.len() { + let next_step = &query_steps[i + 1]; + match &next_step.entity_query_needs { + Some(needs) => { + data_vec.iter().find(|&data| { + if let Some(x) = data.1.data.as_ref() { + // recursively search and find match + let y = find_objects_matching_criteria( + x, + &needs.__typename, + &needs.fields.clone().into_iter().next().unwrap(), + ); + + if y.is_empty() { + return false; + } else { + entity_arguments = Some(SerdeValue::from(y)); + return true; + } + } - let response_vec = execute_query_plan(client, &query_plan, supergraph).await?; + false + }); - // println!("response: {:#?}", json!(response_vec).to_string()); + Some(serde_json::json!({ "representations": entity_arguments })) + } + None => None, + } + } else { + None + }; + } + Err(err) => return Err(err), + } + } + + let x: Vec<((String, String), QueryResponse)> = data_vec + .into_iter() + .map(|(plan_meta, response)| { + let new_response = QueryResponse { + data: response.data, + // Initialize other fields of QueryResponse as needed + errors: response.errors, + extensions: None, + }; + (plan_meta, new_response) + }) + .collect::>(); + + anyhowOk(x) + } - Ok(( - json!(response_vec.index(0).index(0).1).to_string(), - query_plan, - )) + pub async fn execute_query_step( + &self, + query_step: &QueryStep, + entity_arguments: Option, + mut request_context: futures::lock::MutexGuard<'_, &mut RequestExecutionContext>, + ) -> Result { + let is_introspection = query_step.service_name == CONDUCTOR_INTERNAL_SERVICE_RESOLVER; + + if is_introspection { + let schema = dynamically_build_schema_from_supergraph(self.supergraph); + + // Execute the introspection query + // TODO: whenever excuting a query step, we need to take the query out of the step's struct instead of copying it + let request = async_graphql::Request::new(query_step.query.to_string()); + let response = schema.execute(request).await; + + let data = serde_json::to_value(response.data)?; + let errors = response + .errors + .iter() + .map(|e| serde_json::to_value(e).unwrap()) + .collect(); + + anyhowOk(QueryResponse { + data: Some(data), + errors: Some(errors), + extensions: None, + }) + } else { + let _span = Span::enter_with_local_parent(format!("subgraph {}", query_step.service_name)) + .with_properties(|| { + [ + ("service_name", query_step.service_name.clone()), + ("graphql.document", query_step.query.clone()), + ] + }); + let url = self + .supergraph + .subgraphs + .get(&query_step.service_name) + .unwrap(); + + let variables_object = if let Some(arguments) = &entity_arguments { + serde_json::json!({ "representations": arguments }) + } else { + SerdeValue::Object(serde_json::Map::new()) + }; + + let mut upstream_request = ConductorHttpRequest { + method: Method::POST, + body: serde_json::json!({ + "query": query_step.query, + "variables": variables_object + }) + .to_string() + .into(), + uri: url.to_string(), + query_string: "".to_string(), + headers: Default::default(), + }; + + upstream_request + .headers + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + self + .plugin_manager + .on_upstream_http_request(*request_context, &mut upstream_request) + .await; + + if request_context.is_short_circuit() { + return Err(anyhow::anyhow!("short circuit")); + } + + let upstream_req = self + .client + .request(upstream_request.method, upstream_request.uri) + .headers(upstream_request.headers) + .body(upstream_request.body); + + let response = upstream_req.send().await; + + self + .plugin_manager + .on_upstream_http_response(*request_context, &response) + .await; + + let response = match response { + Ok(resp) => resp, + Err(err) => { + eprintln!("Failed to send request: {}", err); + return Err(anyhow::anyhow!("Failed to send request: {}", err)); + } + }; + + if !response.status().is_success() { + eprintln!("Received error response: {:?}", response.status()); + return Err(anyhow::anyhow!( + "Failed request with status: {}", + response.status() + )); + } + + let response_data = match response.json::().await { + Ok(data) => data, + Err(err) => { + eprintln!("Failed to parse response: {}", err); + return Err(anyhow::anyhow!("Failed to parse response: {}", err)); + } + }; + + // Check if there were any GraphQL errors + if let Some(errors) = &response_data.errors { + for error in errors { + eprintln!("Error: {:?}", error); + } + } + + anyhowOk(response_data) + } + } } #[cfg(test)] diff --git a/libs/tracing/src/minitrace_mgr.rs b/libs/tracing/src/minitrace_mgr.rs index 1fa65de6..03ee3e96 100644 --- a/libs/tracing/src/minitrace_mgr.rs +++ b/libs/tracing/src/minitrace_mgr.rs @@ -36,6 +36,7 @@ impl MinitraceManager { routed_reporter } + #[allow(clippy::await_holding_lock)] pub async fn shutdown(self) { tracing::info!("Shutting down tracing reporters..."); minitrace::flush();