Skip to content

Commit

Permalink
Combine Handler and TaskHandler; eradicate all explicit timeouts, now…
Browse files Browse the repository at this point in the history
… that both TcpAccept and Handler are implemented for WithTimeout
  • Loading branch information
ivmarkov committed Oct 11, 2024
1 parent f230d2c commit 767217b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 222 deletions.
10 changes: 8 additions & 2 deletions edge-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ async fn request<'b, const N: usize, T: TcpConnect>(
### HTTP server

```rust
use core::fmt::Display;

use edge_http::io::server::{Connection, DefaultServer, Handler};
use edge_http::io::Error;
use edge_http::Method;
Expand Down Expand Up @@ -131,7 +133,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, HttpHandler, None, None).await?;
server.run(acceptor, HttpHandler).await?;

Ok(())
}
Expand All @@ -144,7 +146,11 @@ where
{
type Error = Error<T::Error>;

async fn handle(&self, conn: &mut Connection<'b, T, N>) -> Result<(), Self::Error> {
async fn handle(
&self,
_task_id: impl Display + Copy,
conn: &mut Connection<'b, T, N>,
) -> Result<(), Self::Error> {
let headers = conn.headers()?;

if headers.method != Method::Get {
Expand Down
253 changes: 43 additions & 210 deletions edge-http/src/io/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use core::fmt::{self, Debug};
use core::fmt::{self, Debug, Display};
use core::mem::{self, MaybeUninit};

use edge_nal::{with_timeout, Close, TcpShutdown, WithTimeout, WithTimeoutError};

use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::mutex::Mutex;

Expand All @@ -20,8 +21,6 @@ pub use embedded_svc_compat::*;

pub const DEFAULT_HANDLER_TASKS_COUNT: usize = 4;
pub const DEFAULT_BUF_SIZE: usize = 2048;
pub const DEFAULT_REQUEST_TIMEOUT_MS: u32 = 30 * 60 * 1000; // 30 minutes
pub const DEFAULT_IO_TIMEOUT_MS: u32 = 50 * 1000; // 50 seconds

const COMPLETION_BUF_SIZE: usize = 64;

Expand Down Expand Up @@ -338,7 +337,16 @@ where
{
type Error: Debug;

async fn handle(&self, connection: &mut Connection<'b, T, N>) -> Result<(), Self::Error>;
/// Handle an incoming HTTP request
///
/// Parameters:
/// - `task_id`: An identifier for the task, thast can be used by the handler for logging purposes
/// - `connection`: A connection state machine for the request-response cycle
async fn handle(
&self,
task_id: impl Display + Copy,
connection: &mut Connection<'b, T, N>,
) -> Result<(), Self::Error>;
}

impl<'b, const N: usize, T, H> Handler<'b, T, N> for &H
Expand All @@ -348,73 +356,46 @@ where
{
type Error = H::Error;

async fn handle(&self, connection: &mut Connection<'b, T, N>) -> Result<(), Self::Error> {
(**self).handle(connection).await
}
}

/// A trait (async callback) for handling a single HTTP request
///
/// The only difference between this and `Handler` is that this trait has an additional `task_id` parameter,
/// which is used for logging purposes
pub trait TaskHandler<'b, T, const N: usize>
where
T: Read + Write,
{
type Error: Debug;

async fn handle(
&self,
task_id: usize,
task_id: impl Display + Copy,
connection: &mut Connection<'b, T, N>,
) -> Result<(), Self::Error>;
) -> Result<(), Self::Error> {
(**self).handle(task_id, connection).await
}
}

impl<'b, const N: usize, T, H> TaskHandler<'b, T, N> for &H
impl<'b, const N: usize, T, H> Handler<'b, T, N> for &mut H
where
T: Read + Write,
H: TaskHandler<'b, T, N>,
H: Handler<'b, T, N>,
{
type Error = H::Error;

async fn handle(
&self,
task_id: usize,
task_id: impl Display + Copy,
connection: &mut Connection<'b, T, N>,
) -> Result<(), Self::Error> {
(**self).handle(task_id, connection).await
}
}

/// A type that adapts a `Handler` into a `TaskHandler`
pub struct TaskHandlerAdaptor<H>(H);

impl<H> TaskHandlerAdaptor<H> {
/// Create a new `TaskHandlerAdaptor` from a `Handler`
pub const fn new(handler: H) -> Self {
Self(handler)
}
}

impl<H> From<H> for TaskHandlerAdaptor<H> {
fn from(value: H) -> Self {
TaskHandlerAdaptor(value)
}
}

impl<'b, const N: usize, T, H> TaskHandler<'b, T, N> for TaskHandlerAdaptor<H>
impl<'b, const N: usize, T, H> Handler<'b, T, N> for WithTimeout<H>
where
T: Read + Write,
H: Handler<'b, T, N>,
{
type Error = H::Error;
type Error = WithTimeoutError<H::Error>;

async fn handle(
&self,
_task_id: usize,
task_id: impl Display + Copy,
connection: &mut Connection<'b, T, N>,
) -> Result<(), Self::Error> {
self.0.handle(connection).await
with_timeout(self.timeout_ms(), self.io().handle(task_id, connection)).await?;

Ok(())
}
}

Expand All @@ -427,66 +408,24 @@ where
/// Parameters:
/// - `io`: A socket stream
/// - `buf`: A work-area buffer used by the implementation
/// - `request_timeout_ms`: An optional timeout for a complete request-response processing, in milliseconds.
/// If not provided, a default timeout of 30 minutes is used.
/// - `task_id`: An identifier for the task, used for logging purposes
/// - `handler`: An implementation of `Handler` to handle incoming requests
pub async fn handle_connection<const N: usize, T, H>(
io: T,
buf: &mut [u8],
request_timeout_ms: Option<u32>,
handler: H,
) where
H: for<'b> Handler<'b, &'b mut T, N>,
T: Read + Write + TcpShutdown,
{
handle_task_connection(
io,
buf,
request_timeout_ms,
0,
TaskHandlerAdaptor::new(handler),
)
.await
}

/// A convenience function to handle multiple HTTP requests over a single socket stream,
/// using the specified task handler.
///
/// The socket stream will be closed only in case of error, or until the client explicitly requests that
/// either with a hard socket close, or with a `Connection: Close` header.
///
/// Parameters:
/// - `io`: A socket stream
/// - `buf`: A work-area buffer used by the implementation
/// - `request_timeout_ms`: An optional timeout for a complete request-response processing, in milliseconds.
/// If not provided, a default timeout of 30 minutes is used.
/// - `task_id`: An identifier for the task, used for logging purposes
/// - `handler`: An implementation of `TaskHandler` to handle incoming requests
pub async fn handle_task_connection<const N: usize, T, H>(
mut io: T,
buf: &mut [u8],
request_timeout_ms: Option<u32>,
task_id: usize,
task_id: impl Display + Copy,
handler: H,
) where
H: for<'b> TaskHandler<'b, &'b mut T, N>,
H: for<'b> Handler<'b, &'b mut T, N>,
T: Read + Write + TcpShutdown,
{
let close = loop {
debug!("Handler task {task_id}: Waiting for new request");

let result = with_timeout(
request_timeout_ms.unwrap_or(DEFAULT_REQUEST_TIMEOUT_MS),
handle_task_request::<N, _, _>(buf, &mut io, task_id, &handler),
)
.await;
let result = handle_request::<N, _, _>(buf, &mut io, task_id, &handler).await;

match result {
Err(WithTimeoutError::Timeout) => {
info!("Handler task {task_id}: Connection closed due to request timeout");
break false;
}
Err(WithTimeoutError::IO(HandleRequestError::Connection(Error::ConnectionClosed))) => {
Err(HandleRequestError::Connection(Error::ConnectionClosed)) => {
debug!("Handler task {task_id}: Connection closed");
break false;
}
Expand Down Expand Up @@ -574,41 +513,17 @@ where
/// Parameters:
/// - `buf`: A work-area buffer used by the implementation
/// - `io`: A socket stream
/// - `task_id`: An identifier for the task, used for logging purposes
/// - `handler`: An implementation of `Handler` to handle incoming requests
pub async fn handle_request<'b, const N: usize, H, T>(
buf: &'b mut [u8],
io: T,
task_id: impl Display + Copy,
handler: H,
) -> Result<bool, HandleRequestError<T::Error, H::Error>>
where
H: Handler<'b, T, N>,
T: Read + Write,
{
handle_task_request(buf, io, 0, TaskHandlerAdaptor::new(handler)).await
}

/// A convenience function to handle a single HTTP request over a socket stream,
/// using the specified task handler.
///
/// Note that this function does not set any timeouts on the request-response processing
/// or on the IO operations. It is up that the caller to use the `with_timeout` function
/// and the `WithTimeout` struct from the `edge-nal` crate to wrap the future returned
/// by this function, or the socket stream, or both.
///
/// Parameters:
/// - `buf`: A work-area buffer used by the implementation
/// - `io`: A socket stream
/// - `task_id`: An identifier for the task, used for logging purposes
/// - `handler`: An implementation of `TaskHandler` to handle incoming requests
pub async fn handle_task_request<'b, const N: usize, H, T>(
buf: &'b mut [u8],
io: T,
task_id: usize,
handler: H,
) -> Result<bool, HandleRequestError<T::Error, H::Error>>
where
H: TaskHandler<'b, T, N>,
T: Read + Write,
{
let mut connection = Connection::<_, N>::new(buf, io).await?;

Expand Down Expand Up @@ -654,100 +569,22 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
/// Parameters:
/// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
/// - `handler`: An implementation of `Handler` to handle incoming requests
/// - `request_timeout_ms`: An optional timeout for a complete request-response processing, in milliseconds.
/// If not provided, a default timeout of 30 minutes is used.
/// - `io_timeout_ms`: An optional timeout for each IO operation, in milliseconds.
/// If not provided, a default timeout of 50 seconds is used.
#[inline(never)]
#[cold]
pub async fn run<A, H>(
&mut self,
acceptor: A,
handler: H,
request_timeout_ms: Option<u32>,
io_timeout_ms: Option<u32>,
) -> Result<(), Error<A::Error>>
where
A: edge_nal::TcpAccept,
H: for<'b, 't> Handler<'b, &'b mut WithTimeout<A::Socket<'t>>, N>,
{
let handler = TaskHandlerAdaptor::new(handler);

// TODO: Figure out what is going on with the lifetimes so as to avoid this horrible code duplication

let mutex = Mutex::<NoopRawMutex, _>::new(());
let mut tasks = heapless::Vec::<_, P>::new();

info!(
"Creating {P} handler tasks, memory: {}B",
core::mem::size_of_val(&tasks)
);

for index in 0..P {
let mutex = &mutex;
let acceptor = &acceptor;
let task_id = index;
let handler = &handler;
let buf: *mut [u8; B] = &mut unsafe { self.0.assume_init_mut() }[index];

tasks
.push(async move {
loop {
debug!("Handler task {task_id}: Waiting for connection");

let io = {
let _guard = mutex.lock().await;

acceptor.accept().await.map_err(Error::Io)?.1
};

let io =
WithTimeout::new(io_timeout_ms.unwrap_or(DEFAULT_IO_TIMEOUT_MS), io);

debug!("Handler task {task_id}: Got connection request");

handle_task_connection::<N, _, _>(
io,
unsafe { buf.as_mut() }.unwrap(),
request_timeout_ms,
task_id,
handler,
)
.await;
}
})
.map_err(|_| ())
.unwrap();
}

let (result, _) = embassy_futures::select::select_slice(&mut tasks).await;

warn!("Server processing loop quit abruptly: {result:?}");

result
}

/// Run the server with the specified acceptor and task handler
///
/// Parameters:
/// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
/// - `handler`: An implementation of `TaskHandler` to handle incoming requests
/// - `request_timeout_ms`: An optional timeout for a complete request-response processing, in milliseconds.
/// If not provided, a default timeout of 30 minutes is used.
/// - `io_timeout_ms`: An optional timeout for each IO operation, in milliseconds.
/// If not provided, a default timeout of 50 seconds is used.
/// Note that the server does NOT - by default - establish any timeouts on the IO operations.
/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
/// timeouts on the socket produced by the acceptor.
///
/// Similarly, the server does NOT establish any timeouts on the complete request-response cycle.
/// It is up to the caller to wrap their complete or partial handling logic with
/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
/// a global or semi-global request-response timeout.
#[inline(never)]
#[cold]
pub async fn run_with_task_id<A, H>(
&mut self,
acceptor: A,
handler: H,
request_timeout_ms: Option<u32>,
io_timeout_ms: Option<u32>,
) -> Result<(), Error<A::Error>>
pub async fn run<A, H>(&mut self, acceptor: A, handler: H) -> Result<(), Error<A::Error>>
where
A: edge_nal::TcpAccept,
H: for<'b, 't> TaskHandler<'b, &'b mut WithTimeout<A::Socket<'t>>, N>,
H: for<'b, 't> Handler<'b, &'b mut A::Socket<'t>, N>,
{
let mutex = Mutex::<NoopRawMutex, _>::new(());
let mut tasks = heapless::Vec::<_, P>::new();
Expand Down Expand Up @@ -775,15 +612,11 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
acceptor.accept().await.map_err(Error::Io)?.1
};

let io =
WithTimeout::new(io_timeout_ms.unwrap_or(DEFAULT_IO_TIMEOUT_MS), io);

debug!("Handler task {task_id}: Got connection request");

handle_task_connection::<N, _, _>(
handle_connection::<N, _, _>(
io,
unsafe { buf.as_mut() }.unwrap(),
request_timeout_ms,
task_id,
handler,
)
Expand Down
Loading

0 comments on commit 767217b

Please sign in to comment.