Skip to content

Commit

Permalink
feat(proxy): use event handlers for handle_listener_events
Browse files Browse the repository at this point in the history
Signed-off-by: iverly <github@iverly.net>
  • Loading branch information
iverly committed Jan 8, 2024
1 parent f9a0733 commit 587dbc6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 55 deletions.
1 change: 1 addition & 0 deletions event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ storage = { path = "../storage" }
shared = { path = "../shared" }
tokio = { version = "1.26.0", features = [ "sync" ] }
tonic = "0.7.2"
anyhow = "1.0.63"
7 changes: 4 additions & 3 deletions event/src/handlers/delete_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use proto::proxy::Backend;
use anyhow::{anyhow, Result};
use shared::models::backend::Backend;
use storage::Storage;
use tokio::sync::{oneshot, Mutex};

Expand All @@ -17,13 +18,13 @@ impl DeleteBackendHandler {
pub async fn handle(
storage: Arc<Mutex<Storage>>,
backend: Backend,
tx: oneshot::Sender<Result<(), tonic::Status>>,
tx: oneshot::Sender<Result<()>>,
) {
let mut storage = storage.lock().await;

let result = storage
.remove_backend(&backend.hostname)
.map_err(|e| tonic::Status::internal(format!("Failed to delete backend: {}", e)));
.map_err(|e| anyhow!("Failed to delete backend: {}", e));

let _ = tx.send(result);
}
Expand Down
17 changes: 4 additions & 13 deletions event/src/handlers/list_backend.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use proto::proxy::Backend;
use anyhow::Result;
use shared::models::backend::Backend;
use storage::Storage;
use tokio::sync::{oneshot, Mutex};

use crate::tonic_backend_from_proxy;

pub struct ListBackendHandler {}

impl ListBackendHandler {
Expand All @@ -16,18 +15,10 @@ impl ListBackendHandler {
/// * `storage`: Arc<Mutex<Storage>> - the storage object that holds all the backends
/// * `backend`: The backend to add to the storage.
/// * `tx`: This is the channel that the client is listening on.
pub async fn handle(
storage: Arc<Mutex<Storage>>,
tx: oneshot::Sender<Result<Vec<Backend>, tonic::Status>>,
) {
pub async fn handle(storage: Arc<Mutex<Storage>>, tx: oneshot::Sender<Result<Vec<Backend>>>) {
let storage = storage.lock().await;

let backends = storage
.get_backends()
.clone()
.into_values()
.map(tonic_backend_from_proxy)
.collect();
let backends = storage.get_backends().clone().into_values().collect();

let _ = tx.send(Ok(backends));
}
Expand Down
11 changes: 5 additions & 6 deletions event/src/handlers/put_backend.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use proto::proxy::Backend;
use anyhow::{anyhow, Result};
use shared::models::backend::Backend;
use storage::Storage;
use tokio::sync::{oneshot, Mutex};

use crate::proxy_backend_from_tonic;

pub struct PutBackendHandler {}

impl PutBackendHandler {
Expand All @@ -19,13 +18,13 @@ impl PutBackendHandler {
pub async fn handle(
storage: Arc<Mutex<Storage>>,
backend: Backend,
tx: oneshot::Sender<Result<(), tonic::Status>>,
tx: oneshot::Sender<Result<()>>,
) {
let mut storage = storage.lock().await;

let result = storage
.add_backend(proxy_backend_from_tonic(backend))
.map_err(|e| tonic::Status::internal(format!("Failed to add backend: {}", e)));
.add_backend(backend)
.map_err(|e| anyhow!(format!("Failed to add backend: {}", e)));

let _ = tx.send(result);
}
Expand Down
1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ protocol = { path = "../protocol" }
shared = { path = "../shared" }
listener = { path = "../listener" }
storage = { path = "../storage" }
event = { path = "../event" }
log = "0.4.17"
tokio = { version = "1.21.0", features = ["rt", "net", "io-util", "sync"] }
anyhow = "1.0.63"
55 changes: 22 additions & 33 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::{env, sync::Arc};

use anyhow::{anyhow, Ok, Result};
use event::handlers::{
delete_backend::DeleteBackendHandler, list_backend::ListBackendHandler,
put_backend::PutBackendHandler,
};
use listener::{event::Event, Listener};
use log::debug;
use storage::Storage;
Expand Down Expand Up @@ -216,6 +220,19 @@ impl Proxy {
Ok(())
}

/// The function `handle_listener_events` handles events received from a channel by spawning async
/// tasks to handle different types of events.
///
/// Arguments:
///
/// * `rx`: A `Receiver<Event>` which is used to receive events from some event source.
/// * `storage`: `storage` is an `Arc<Mutex<Storage>>` which is a shared mutable state that is
/// protected by a mutex. It allows multiple threads to access and modify the `Storage` struct
/// concurrently.
///
/// Returns:
///
/// a `Result<()>`.
async fn handle_listener_events(
mut rx: Receiver<Event>,
storage: Arc<Mutex<Storage>>,
Expand All @@ -228,42 +245,14 @@ impl Proxy {

tokio::spawn(async move {
match event {
Event::DeleteBackend(backend, tx) => {
tx.send(storage.lock().await.remove_backend(&backend.hostname))
.map_err(|_| {
log::error!("failed to send delete backend response");
anyhow!("failed to send delete backend response")
})?;
Event::ListBackends(tx) => {
ListBackendHandler::handle(storage, tx).await;
}
Event::PutBackend(backend, tx) => {
tx.send(storage.lock().await.add_backend(
shared::models::backend::Backend::new(
backend.hostname,
backend.redirect_ip,
backend.redirect_port,
),
))
.map_err(|_| {
log::error!("failed to send put backend response");
anyhow!("failed to send put backend response")
})?;
PutBackendHandler::handle(storage, backend, tx).await;
}
Event::ListBackends(tx) => {
tx.send(Ok(storage
.lock()
.await
.get_backends()
.iter()
.map(|backend| shared::models::backend::Backend {
hostname: backend.1.hostname().to_string(),
redirect_ip: backend.1.redirect_ip().to_string(),
redirect_port: backend.1.redirect_port(),
})
.collect::<Vec<_>>()))
.map_err(|_| {
log::error!("failed to send list backends response");
anyhow!("failed to send list backends response")
})?;
Event::DeleteBackend(backend, tx) => {
DeleteBackendHandler::handle(storage, backend, tx).await;
}
}
Ok(())
Expand Down

0 comments on commit 587dbc6

Please sign in to comment.