Skip to content

Commit

Permalink
better response on server errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane Osbourne committed Jun 6, 2024
1 parent 9dda0bb commit 1efd969
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 62 deletions.
5 changes: 4 additions & 1 deletion crates/bsnext_client/generated/dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ export type ServerChange =
bind_address: string;
}}
| { kind: "Started", payload?: undefined }
| { kind: "Patched", payload?: undefined };
| { kind: "Patched", payload?: undefined }
| { kind: "Errored", payload: {
error: string;
}};

export interface ServerChangeSetItem {
identity: IdentityDTO;
Expand Down
6 changes: 6 additions & 0 deletions crates/bsnext_client/generated/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ export const serverChangeSchema = z.union([
kind: z.literal("Patched"),
payload: z.undefined().optional(),
}),
z.object({
kind: z.literal("Errored"),
payload: z.object({
error: z.string(),
}),
}),
]);

export const serverChangeSetItemSchema = z.object({
Expand Down
6 changes: 6 additions & 0 deletions crates/bsnext_client/inject/dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6348,6 +6348,12 @@ var serverChangeSchema = z.union([
z.object({
kind: z.literal("Patched"),
payload: z.undefined().optional()
}),
z.object({
kind: z.literal("Errored"),
payload: z.object({
error: z.string()
})
})
]);
var serverChangeSetItemSchema = z.object({
Expand Down
6 changes: 4 additions & 2 deletions crates/bsnext_core/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub enum ServerError {
AddrInUse { socket_addr: SocketAddr },
#[error("invalid bind address: {addr_parse_error}")]
InvalidAddress { addr_parse_error: String },
#[error("could not determine the reason")]
Unknown,
#[error("could not determine the reason: `{0}`")]
Unknown(String),
#[error("io error {0}")]
Io(String),
#[error("server was closed")]
Closed,
}
Expand Down
6 changes: 3 additions & 3 deletions crates/bsnext_core/src/server/handler_listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl actix::Handler<Listen> for ServerActor {
}
_ => {
tracing::error!("{:?} [not-started] UNKNOWN {}", identity, e);
Err(ServerError::Unknown)
Err(ServerError::Unknown(format!("{}", e)))
}
},
};
Expand All @@ -113,7 +113,7 @@ impl actix::Handler<Listen> for ServerActor {
Ok((socket_addr, self_addr.clone()))
}
None => {
Err(ServerError::Unknown)
Err(ServerError::Unknown("unknown".to_string()))
}
}
}
Expand All @@ -132,7 +132,7 @@ impl actix::Handler<Listen> for ServerActor {
}
Err(e) => {
tracing::error!("-->{e}");
Err(ServerError::Unknown)
Err(ServerError::Unknown(format!("{:?}", e)))
}
}
}
Expand Down
35 changes: 23 additions & 12 deletions crates/bsnext_core/src/servers_supervisor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::server::handler_stop::Stop;
use actix::{Actor, Addr, Running};

use crate::server::actor::ServerActor;
use crate::servers_supervisor::start_handler::StartMessage;
use crate::servers_supervisor::start_handler::{ChildResult, StartMessage};

use crate::server::handler_patch::Patch;
use bsnext_input::server_config::Identity;
Expand Down Expand Up @@ -102,23 +102,34 @@ impl ServersSupervisor {

if !start_jobs.is_empty() {
tracing::debug!("starting {:?} servers", start_jobs.len());
let idens = start_jobs
.iter()
.map(|x| &x.identity)
.map(|x| x.to_owned())
.collect::<Vec<_>>();
match self_addr
.send(StartMessage {
server_configs: start_jobs,
})
.await
{
Ok(_) => {
for x in idens {
changeset.items.push(ServerChangeSetItem {
identity: (&x).into(),
change: ServerChange::Started,
})
Ok(output) => {
for x in output {
match x {
ChildResult::Ok(child_created) => {
tracing::info!("child_created");
let iden = child_created.server_handler.identity.clone();
self_addr.do_send(child_created);
changeset.items.push(ServerChangeSetItem {
identity: (&iden).into(),
change: ServerChange::Started,
})
}
ChildResult::Err(e) => {
tracing::info!(?e, "child not created");
changeset.items.push(ServerChangeSetItem {
identity: (&e.identity).into(),
change: ServerChange::Errored {
error: format!("{:?}", e.server_error),
},
})
}
}
}
}
Err(_) => tracing::error!("could not send StartMessage to self"),
Expand Down
121 changes: 77 additions & 44 deletions crates/bsnext_core/src/servers_supervisor/start_handler.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,90 @@
use crate::server::actor::ServerActor;
use crate::server::error::ServerError;
use crate::server::handler_listen::Listen;
use crate::servers_supervisor::actor::{ChildHandler, ServersSupervisor};
use actix::{Actor, AsyncContext};
use bsnext_input::server_config::ServerConfig;
use bsnext_input::server_config::{Identity, ServerConfig};
use futures_util::future::join_all;
use futures_util::FutureExt;
use std::future::Future;

use std::pin::Pin;
use tracing::{span, Instrument, Level};

#[derive(actix::Message)]
#[rtype(result = "()")]
#[rtype(result = "Vec<ChildResult>")]
pub(crate) struct StartMessage {
pub server_configs: Vec<ServerConfig>,
}

impl actix::Handler<StartMessage> for ServersSupervisor {
type Result = Pin<Box<dyn Future<Output = ()>>>;
type Result = Pin<Box<dyn Future<Output = Vec<ChildResult>>>>;

fn handle(&mut self, msg: StartMessage, ctx: &mut Self::Context) -> Self::Result {
let span = span!(Level::TRACE, "actix::Handler<StartMessage> for Servers");
let self_addr = ctx.address();

let workload = async move {
tracing::debug!("creating {} actor(s)", msg.server_configs.len());
let fts = msg
.server_configs
.into_iter()
.map(|server_config| {
let server = ServerActor::new_from_config(server_config.clone());
let actor_addr = server.start();
let c = server_config.clone();
actor_addr
.send(Listen {
parent: self_addr.clone().recipient(),
})
.map(|r| (r, c))
})
.collect::<Vec<_>>();

let results = join_all(fts).await;
for (fut_result, server_config) in results {
match fut_result {
Ok(msg_response) => match msg_response {
Ok((addr, actor_addr)) => {
tracing::debug!("✚ got listening child: {}", addr.to_string());
self_addr.do_send(ChildCreated {
server_handler: ChildHandler {
actor_address: actor_addr,
identity: server_config.identity,
socket_addr: addr,
},
});
}
Err(e) => {
tracing::error!("{:?} <- {}", server_config.identity, e)
Box::pin(
async move {
tracing::debug!("creating {} actor(s)", msg.server_configs.len());
let fts = msg
.server_configs
.into_iter()
.map(|server_config| {
let server = ServerActor::new_from_config(server_config.clone());
let actor_addr = server.start();
let c = server_config.clone();
actor_addr
.send(Listen {
parent: self_addr.clone().recipient(),
})
.map(|r| (r, c))
})
.collect::<Vec<_>>();
tracing::info!("got {} servers to listen to", fts.len());
let results = join_all(fts).await;
results
.into_iter()
.map(|(fut_result, server_config)| match fut_result {
Ok(msg_response) => match msg_response {
Ok((addr, actor_addr)) => {
tracing::debug!("✚ got listening child: {}", addr.to_string());
ChildResult::Ok(ChildCreated {
server_handler: ChildHandler {
actor_address: actor_addr,
identity: server_config.identity,
socket_addr: addr,
},
})
}
Err(e) => {
tracing::error!("{:?} <- {}", server_config.identity, e);
ChildResult::Err(ChildNotCreated {
server_error: e,
identity: server_config.identity.clone(),
})
}
},
Err(_e) => {
unreachable!("mailbox ?")
}
},
Err(e) => tracing::error!(" <- [m] {}", e),
}
})
.collect::<Vec<ChildResult>>()
}
}
.instrument(span);

Box::pin(workload)
.instrument(span),
)
}
}

#[derive(Debug, actix::Message)]
#[rtype(result = "()")]
pub struct ChildCreated {
server_handler: ChildHandler,
pub(crate) server_handler: ChildHandler,
}
#[derive(Debug)]
pub enum ChildResult {
Ok(ChildCreated),
Err(ChildNotCreated),
}

impl actix::Handler<ChildCreated> for ServersSupervisor {
Expand All @@ -84,3 +98,22 @@ impl actix::Handler<ChildCreated> for ServersSupervisor {
tracing::trace!("ChildCreated child count: {}", self.handlers.len());
}
}

#[derive(Debug, actix::Message)]
#[rtype(result = "()")]
pub struct ChildNotCreated {
pub server_error: ServerError,
pub identity: Identity,
}

impl actix::Handler<ChildNotCreated> for ServersSupervisor {
type Result = ();

fn handle(&mut self, _msg: ChildNotCreated, _ctx: &mut Self::Context) -> Self::Result {
// self.handlers.insert(
// msg.server_handler.identity.clone(),
// msg.server_handler.clone(),
// );
// tracing::trace!("ChildCreated child count: {}", self.handlers.len());
}
}
1 change: 1 addition & 0 deletions crates/bsnext_dto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub enum ServerChange {
Stopped { bind_address: String },
Started,
Patched,
Errored { error: String },
}

#[typeshare]
Expand Down
11 changes: 11 additions & 0 deletions crates/bsnext_output/src/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,18 @@ where
}
},
ServerChange::Patched => {}
ServerChange::Errored { error } => {
writeln!(w, "[❌ server failed] {} {}", iden(&identity), error)?;
}
}
}
Ok(())
}

fn iden(identity_dto: &IdentityDTO) -> String {
match identity_dto {
IdentityDTO::Both { name, bind_address } => format!("[{name}] {bind_address}"),
IdentityDTO::Address { bind_address } => format!("{}", bind_address),
IdentityDTO::Named { name } => format!("[{name}]"),
}
}
1 change: 1 addition & 0 deletions crates/bsnext_tracing/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub fn init_tracing_subscriber(
tracing_subscriber::fmt::layer()
.json()
.with_ansi(false)
// todo(alpha): use this example as a way to move this output into the terminal window
.with_writer(file)
.boxed()
}
Expand Down

0 comments on commit 1efd969

Please sign in to comment.