Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Tokio utils (#1342)
Browse files Browse the repository at this point in the history
* Add tokio_utils (#1336)

Add a new module, tokio_utils to the iml-util crate.

Within it, add three new fns to deal with incoming socket streams of
different types.

We should use these fns when we don't know which type of stream we will
be using at runtime.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>

* Allow action runner to run on both unix socket and http

Signed-off-by: Will Johnson <wjohnson@whamcloud.com>

* - backport tokio utils

Signed-off-by: Will Johnson <wjohnson@whamcloud.com>

* - Setup action runner to use both tcp and socket ports

Signed-off-by: Will Johnson <wjohnson@whamcloud.com>

* - Update python release to 2
- Update wasm-components

Signed-off-by: Will Johnson <wjohnson@whamcloud.com>

* - Some CR updates

Signed-off-by: Will Johnson <wjohnson@whamcloud.com>
  • Loading branch information
johnsonw authored and jgrund committed Nov 21, 2019
1 parent a2cae10 commit 2db5fd6
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 155 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iml-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ iml-wire-types = { path = "../iml-wire-types", version = "0.1" }
iml-rabbit = { path = "../iml-rabbit", version = "0.1.0" }
iml-manager-env = { path = "../iml-manager-env", version = "0.1.0" }
tokio-runtime-shutdown = { path = "../tokio-runtime-shutdown", version = "0.1.0" }
iml-util = { path = "../iml-util", version = "0.1.0" }
parking_lot = "0.8"

[dev-dependencies]
Expand Down
16 changes: 4 additions & 12 deletions iml-services/src/services/action_runner/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@ use iml_services::{
sender::{create_client_filter, sender},
},
};
use iml_util::tokio_utils::*;
use iml_wire_types::PluginMessage;
use parking_lot::Mutex;
use std::{
collections::HashMap,
os::unix::{io::FromRawFd, net::UnixListener as NetUnixListener},
sync::Arc,
};
use tokio::{net::UnixListener, reactor::Handle};
use std::{collections::HashMap, sync::Arc};
use warp::{self, Filter as _};

pub static AGENT_TX_RUST: &'static str = "agent_tx_rust";
Expand Down Expand Up @@ -47,12 +43,8 @@ fn main() {
.map(|x| warp::reply::json(&x))
.with(log);

let addr = unsafe { NetUnixListener::from_raw_fd(3) };

let listener = UnixListener::from_std(addr, &Handle::default())
.expect("Unable to bind Unix Domain Socket fd")
.incoming()
.inspect(|_| log::debug!("Client connected"));
let listener =
get_tcp_or_unix_listener("ACTION_RUNNER_PORT").expect("Could not bind to socket.");

tokio::spawn(warp::serve(routes).serve_incoming(valve.wrap(listener)));

Expand Down
5 changes: 5 additions & 0 deletions iml-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@ version = "0.1.0"
authors = ["IML Team <iml@whamcloud.com>"]
edition = "2018"

[dependencies]
futures = "0.1"
tokio = "0.1"
tracing = "0.1"

[lib]
path = "util.rs"
71 changes: 71 additions & 0 deletions iml-util/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,77 @@ impl<T> Flatten<T> for Option<Option<T>> {
}
}

pub mod tokio_utils {
use futures::{future::Either, Stream};
use std::{
env, io,
os::unix::{io::FromRawFd, net::UnixListener as NetUnixListener},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{unix::UnixListener, TcpListener},
reactor::Handle,
};

pub trait Socket: AsyncRead + AsyncWrite + Send + 'static {}
impl<T: AsyncRead + AsyncWrite + Send + 'static> Socket for T {}

/// Given an environment variable that resolves to a port,
/// Return a stream containing `TcpStream`s that have been erased to
/// `AsyncRead` + `AsyncWrite` traits. This is useful when you won't know which stream
/// to choose at runtime
pub fn get_tcp_stream(
port: String,
) -> Result<impl Stream<Item = Box<dyn Socket>, Error = io::Error>, io::Error> {
let addr = format!("127.0.0.1:{}", port)
.parse()
.expect("Couldn't parse socket address.");

tracing::debug!("Listening over tcp port {}", port);

let listener = TcpListener::bind(&addr)?;
let s = listener
.incoming()
.map(|x| -> Box<dyn Socket> { Box::new(x) });

Ok(s)
}

/// Returns a stream containing `UnixStream`s that have been erased to
/// `AsyncRead` + `AsyncWrite` traits. This is useful when you won't know which stream
/// to choose at runtime
pub fn get_unix_stream(
) -> Result<impl Stream<Item = Box<dyn Socket>, Error = io::Error>, io::Error> {
let addr = unsafe { NetUnixListener::from_raw_fd(3) };

tracing::debug!("Listening over unix domain socket");

let s = UnixListener::from_std(addr, &Handle::default())?
.incoming()
.map(|socket| -> Box<dyn Socket> { Box::new(socket) });

Ok(s)
}

/// Given an environment variable that resolves to a port,
/// Return a stream containing items that have been erased to
/// `AsyncRead` + `AsyncWrite` traits. This is useful when you won't know which stream
/// to choose at runtime.
///
/// If the `port_var` resolves to a port, `TcpStream` will be used internally.
/// Otherwise a `UnixStream` will be used.
pub fn get_tcp_or_unix_listener(
port_var: &str,
) -> Result<impl Stream<Item = Box<dyn Socket>, Error = io::Error>, io::Error> {
let s = match env::var(port_var) {
Ok(port) => Either::A(get_tcp_stream(port)?),
Err(_) => Either::B(get_unix_stream()?),
};

Ok(s)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 2db5fd6

Please sign in to comment.