Skip to content

Commit

Permalink
Upgrade hyper examples to adapt to dev dependency hyper v1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Dec 1, 2023
1 parent 39fd963 commit 6906232
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 51 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# 0.1.6 (01. December, 2023)

- Upgrade hyper examples to adapt to dev dependency hyper v1.0 (was hyper v0.14);

# 0.1.5 (20. September, 2023)

- Support and use Loom for testing;
Expand Down
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
categories = ["asynchronous", "network-programming"]
edition = "2021"
name = "tokio-graceful"
version = "0.1.5"
version = "0.1.6"
description = "util for graceful shutdown of tokio applications"
homepage = "https://github.com/plabayo/tokio-graceful"
readme = "README.md"
Expand All @@ -25,4 +25,7 @@ tokio = { version = "1", features = ["net", "rt-multi-thread", "io-util", "test-
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[target.'cfg(not(loom))'.dev-dependencies]
hyper = { version = "0.14", features = [ "server", "tcp", "http1", "http2" ] }
hyper = { version = "1.0.1", features = [ "server", "http1", "http2" ] }
hyper-util = { version = "0.1.1", features = [ "server", "server-auto", "http1", "http2", "tokio" ] }
http-body-util = "0.1"
bytes = "1"
73 changes: 50 additions & 23 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
//! [`axum`]: https://docs.rs/axum
use std::convert::Infallible;
use std::net::SocketAddr;
use std::time::Duration;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1::Builder;
use hyper::service::service_fn;
use hyper::{body::Incoming, Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
Expand Down Expand Up @@ -50,32 +56,53 @@ async fn main() {
}

async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) {
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(|_conn| {
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
//
// NOTE, make sure to pass a clone of the shutdown guard to the service fn
// in case you wish to be able to cancel a long running process should the
// shutdown signal be received and you know that your task might not finish on time.
// This allows you to at least leave it behind in a consistent state such that another
// process can pick up where you left that task.
async { Ok::<_, Infallible>(service_fn(hello)) }
});
let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();

let addr = ([127, 0, 0, 1], 8080).into();
let listener = TcpListener::bind(&addr).await.unwrap();

let server = Server::bind(&addr).serve(make_svc);
let server = server.with_graceful_shutdown(shutdown_guard.clone_weak().into_cancelled());
loop {
let stream = tokio::select! {
_ = shutdown_guard.cancelled() => {
tracing::info!("signal received: initiate graceful shutdown");
break;
}
result = listener.accept() => {
match result {
Ok((stream, _)) => {
stream
}
Err(e) => {
tracing::warn!("accept error: {:?}", e);
continue;
}
}
}
};
let stream = TokioIo::new(stream);

if let Err(err) = server.await {
tracing::error!("server quit with error: {}", err);
shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move {
let conn = Builder::new()
.serve_connection(stream, service_fn(hello));
let mut conn = std::pin::pin!(conn);

loop {
tokio::select! {
_ = guard.cancelled() => {
conn.as_mut().graceful_shutdown();
}
result = conn.as_mut() => {
if let Err(err) = result {
tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error");
}
break;
}
}
}
});
}
}

async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(Response::new(Body::from("Hello World!")))
Ok(Response::new(Full::from("Hello World!")))
}
76 changes: 50 additions & 26 deletions examples/hyper_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
//! [`axum`]: https://docs.rs/axum
use std::convert::Infallible;
use std::net::SocketAddr;
use std::time::Duration;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1::Builder;
use hyper::service::service_fn;
use hyper::{body::Incoming, Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
Expand Down Expand Up @@ -90,35 +96,53 @@ async fn main() {
}

async fn serve_tcp(shutdown_guard: tokio_graceful::ShutdownGuard) {
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(|_conn| {
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
//
// NOTE, make sure to pass a clone of the shutdown guard to the service fn
// in case you wish to be able to cancel a long running process should the
// shutdown signal be received and you know that your task might not finish on time.
// This allows you to at least leave it behind in a consistent state such that another
// process can pick up where you left that task.
async { Ok::<_, Infallible>(service_fn(hello)) }
});
let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();

let listener = TcpListener::bind(&addr).await.unwrap();

let addr = ([127, 0, 0, 1], 8080).into();
loop {
let stream = tokio::select! {
_ = shutdown_guard.cancelled() => {
tracing::info!("signal received: initiate graceful shutdown");
break;
}
result = listener.accept() => {
match result {
Ok((stream, _)) => {
stream
}
Err(e) => {
tracing::warn!("accept error: {:?}", e);
continue;
}
}
}
};
let stream = TokioIo::new(stream);

let server = Server::bind(&addr).serve(make_svc);
let server = server.with_graceful_shutdown(shutdown_guard.clone_weak().into_cancelled());
shutdown_guard.spawn_task_fn(move |guard: tokio_graceful::ShutdownGuard| async move {
let conn = Builder::new()
.serve_connection(stream, service_fn(hello));
let mut conn = std::pin::pin!(conn);

if let Err(err) = server.await {
tracing::error!(
error = &err as &dyn std::error::Error,
"server quit with error"
);
loop {
tokio::select! {
_ = guard.cancelled() => {
conn.as_mut().graceful_shutdown();
}
result = conn.as_mut() => {
if let Err(err) = result {
tracing::error!(error = &err as &dyn std::error::Error, "conn exited with error");
}
break;
}
}
}
});
}
}

async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(Response::new(Body::from("Hello World!")))
Ok(Response::new(Full::from("Hello World!")))
}

0 comments on commit 6906232

Please sign in to comment.