diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ffe46d..03d6885 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ensure that openwecd shutdowns gracefully even if hyper server is not responding - Improve the logging of failed Kerberos authentications: missing authorization header warning is now in DEBUG level +### Fixed + +- Fixed an issue that could result in an inconsistent state when a client unexpectedly closes an HTTP connection. + ## [0.1.0] - 2023-05-30 Initial commit containing most of the desired features. The project is still under heavy development and production use without a backup solution should be avoided. diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index f9adf66..be61aa1 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -110,6 +110,8 @@ # - ip # - port # - principal +# - conn_status: 'X' (connection aborted before the response completed) +# '+' (connection may be kept alive after the response is sent) # Default value is None, meaning "{X(ip)}:{X(port)} - {X(principal)} [{d}] \"{X(http_uri)}\" {X(http_status)} {X(response_time)}{n}" # access_logs_pattern = None @@ -239,4 +241,4 @@ # Server private key, corresponding to the certificate # server_private_key = "/etc/server-key.pem" -## End of TLS configuration \ No newline at end of file +## End of TLS configuration diff --git a/server/src/lib.rs b/server/src/lib.rs index 12ff41b..785487f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -53,7 +53,7 @@ use std::{env, mem}; use subscription::{reload_subscriptions_task, Subscriptions}; use tls_listener::TlsListener; use tokio::signal::unix::SignalKind; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; @@ -273,7 +273,7 @@ async fn handle_payload( db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, - request_data: RequestData, + request_data: &RequestData, request_payload: Option, auth_ctx: &AuthenticationContext, ) -> Result<(StatusCode, Option)> { @@ -313,6 +313,24 @@ async fn handle_payload( } } +enum ConnectionStatus { + // Connection aborted before the response completed. + Aborted, + // Connection may be kept alive after the response is sent. + Alive, +} + +impl ConnectionStatus { + pub fn as_str(&self) -> &str { + // This is inspired by %X of Apache httpd: + // https://httpd.apache.org/docs/current/mod/mod_log_config.html + match self { + Self::Aborted => "X", + Self::Alive => "+", + } + } +} + fn log_response( addr: &SocketAddr, method: &str, @@ -320,8 +338,10 @@ fn log_response( start: &Instant, status: StatusCode, principal: &str, + conn_status: ConnectionStatus, ) { let duration: f32 = start.elapsed().as_micros() as f32; + // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); @@ -331,12 +351,20 @@ fn log_response( log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); log_mdc::insert("principal", principal); + log_mdc::insert("conn_status", conn_status.as_str()); // Empty message, logging pattern should use MDC info!(target: ACCESS_LOGGER, ""); log_mdc::clear(); } +fn build_error_response(status: StatusCode) -> Response { + Response::builder() + .status(status) + .body(Body::empty()) + .expect("Failed to build HTTP response") +} + async fn handle( server: ServerSettings, collector: Collector, @@ -365,11 +393,16 @@ async fn handle( Ok((principal, builder)) => (principal, builder), Err(_) => { let status = StatusCode::UNAUTHORIZED; - log_response(&addr, &method, &uri, &start, status, "-"); - return Ok(Response::builder() - .status(status) - .body(Body::empty()) - .expect("Failed to build HTTP response")); + log_response( + &addr, + &method, + &uri, + &start, + status, + "-", + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); } }; @@ -380,11 +413,16 @@ async fn handle( Err(e) => { error!("Failed to compute request data: {:?}", e); let status = StatusCode::NOT_FOUND; - log_response(&addr, &method, &uri, &start, status, &principal); - return Ok(Response::builder() - .status(status) - .body(Body::empty()) - .expect("Failed to build HTTP response")); + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); } }; @@ -394,11 +432,16 @@ async fn handle( Err(e) => { error!("Failed to retrieve request payload: {:?}", e); let status = StatusCode::BAD_REQUEST; - log_response(&addr, &method, &uri, &start, status, &principal); - return Ok(Response::builder() - .status(status) - .body(Body::empty()) - .expect("Failed to build HTTP response")); + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); } }; @@ -408,27 +451,100 @@ async fn handle( ); // Handle request payload, and retrieves response payload - let (status, response_payload) = match handle_payload( - &server, - &collector, - db, - subscriptions, - heartbeat_tx, - request_data, - request_payload, - &auth_ctx, - ) - .await - { - Ok((status, response_payload)) => (status, response_payload), - Err(e) => { - error!("Failed to compute a response payload to request: {:?}", e); + // + // It seems that Hyper can abort the Service future at any time (for example if the client + // closes the connection), meaning that any ".await" can be a dead end. + // We want to ensure that the payload handling cannot be aborted unexpectedly resulting + // in an inconsistent state. + // To achieve that, the handle_payload function is executed in an independent Tokio task. + // + // In practice, Windows clients appear to close connections to their configured WEC server + // when they (re-)apply group policies. + + // handle_payload task result will be returned using a oneshot channel + let (tx, rx) = oneshot::channel(); + + // The following variables need to be cloned because they are moved in the spawned closure + let auth_ctx_cloned = auth_ctx.clone(); + let method_cloned = method.clone(); + let uri_cloned = uri.clone(); + let principal_cloned = principal.clone(); + + tokio::spawn(async move { + let res = handle_payload( + &server, + &collector, + db, + subscriptions, + heartbeat_tx, + &request_data, + request_payload, + &auth_ctx_cloned, + ) + .await; + if let Err(e) = &res { + error!( + "Failed to compute a response payload to request (from {}:{}): {:?}", + request_data.remote_addr().ip(), + request_data.remote_addr().port(), + e + ); + } + if let Err(value) = tx.send(res) { + debug!( + "Could not send handle_payload result to handling Service for {}:{} (receiver dropped). Result was: {:?}", + request_data.remote_addr().ip(), + request_data.remote_addr().port(), + value + ); + // Log this response with conn_status = Aborted + let status = match value { + Ok((status, _)) => status, + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + }; + log_response( + request_data.remote_addr(), + &method_cloned, + &uri_cloned, + &start, + status, + &principal_cloned, + ConnectionStatus::Aborted, + ); + } + }); + + // Wait for the handle_payload task to answer using the oneshot channel + let (status, response_payload) = match rx.await { + Ok(Ok((status, response_payload))) => (status, response_payload), + Ok(Err(_)) => { + // Ok(Err(_)): the handle_payload task returned an Err let status = StatusCode::INTERNAL_SERVER_ERROR; - log_response(&addr, &method, &uri, &start, status, &principal); - return Ok(Response::builder() - .status(status) - .body(Body::empty()) - .expect("Failed to build HTTP response")); + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); + } + Err(_) => { + // Err(_): the handle_payload task "sender" has been dropped (should not happen) + error!("handle_payload task sender has been dropped. Maybe the task panicked?"); + let status = StatusCode::INTERNAL_SERVER_ERROR; + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); } }; @@ -445,15 +561,28 @@ async fn handle( Err(e) => { error!("Failed to build HTTP response: {:?}", e); let status = StatusCode::INTERNAL_SERVER_ERROR; - log_response(&addr, &method, &uri, &start, status, &principal); - return Ok(Response::builder() - .status(status) - .body(Body::empty()) - .expect("Failed to build HTTP response")); + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); } }; - log_response(&addr, &method, &uri, &start, response.status(), &principal); + log_response( + &addr, + &method, + &uri, + &start, + response.status(), + &principal, + ConnectionStatus::Alive, + ); Ok(response) } diff --git a/server/src/logic.rs b/server/src/logic.rs index a72eb2c..0a88855 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -441,7 +441,7 @@ pub async fn handle_message( db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, - request_data: RequestData, + request_data: &RequestData, message: &Message, auth_ctx: &AuthenticationContext, ) -> Result { @@ -449,13 +449,13 @@ pub async fn handle_message( debug!("Received {} request", action); if action == ACTION_ENUMERATE { - handle_enumerate(collector, &db, subscriptions, &request_data, auth_ctx) + handle_enumerate(collector, &db, subscriptions, request_data, auth_ctx) .await .context("Failed to handle Enumerate action") } else if action == ACTION_END || action == ACTION_SUBSCRIPTION_END { Ok(Response::err(StatusCode::OK)) } else if action == ACTION_HEARTBEAT { - handle_heartbeat(subscriptions, heartbeat_tx, &request_data, message) + handle_heartbeat(subscriptions, heartbeat_tx, request_data, message) .await .context("Failed to handle Heartbeat action") } else if action == ACTION_EVENTS { @@ -464,7 +464,7 @@ pub async fn handle_message( &db, subscriptions, heartbeat_tx, - &request_data, + request_data, message, ) .await diff --git a/server/src/outputs/file.rs b/server/src/outputs/file.rs index 4aa0a28..73d6a18 100644 --- a/server/src/outputs/file.rs +++ b/server/src/outputs/file.rs @@ -25,7 +25,7 @@ pub struct WriteFileMessage { async fn handle_message( file_handles: &mut HashMap, - message: &mut WriteFileMessage, + message: &WriteFileMessage, ) -> Result<()> { let parent = message .path @@ -67,8 +67,8 @@ pub async fn run(mut task_rx: mpsc::Receiver, task_ct: Cancell let mut file_handles: HashMap = HashMap::new(); loop { tokio::select! { - Some(mut message) = task_rx.recv() => { - let result = handle_message(&mut file_handles, &mut message).await; + Some(message) = task_rx.recv() => { + let result = handle_message(&mut file_handles, &message).await; if let Err(e) = message .resp .send(result) {