Skip to content

Commit

Permalink
implement exec_script from server
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkunde committed Aug 2, 2023
1 parent c232567 commit ddc5cbf
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.20"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.4", features = [ "v4", "fast-rng", "macro-diagnostics"]}
duration-str = "0.5.1"
uuid = { version = "1.4", features = [ "v4", "fast-rng", "macro-diagnostics"] }
duration-str = "0.5"

[[bin]]
name = "unpatched-agent"
Expand Down
93 changes: 56 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ struct AgentDataMemory {
av_mem: u64,
total_mem: u64,
}
// TODO: Use struct from unpatched server as a dependency
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone, Default)]
struct Script {
id: String,
name: String,
version: String,
output_regex: String,
labels: String,
timeout: String,
script_content: String,
}

const RETRY: Duration = Duration::new(5, 0);

Expand Down Expand Up @@ -88,37 +99,15 @@ async fn main() {
// trigger first data send via mpsc channel
let _tx_send = tx.send(true).await;

// all things outgoing
// ##################
// ALL THE SEND STUFF
// ##################
let _sender_handle = tokio::spawn(async move {
// start off easy
let _ping = sink_message(&arc_sink, Message::Ping(alias.clone().into())).await;
info!("Connection established and validated via ping message");
loop {
if let Some(_data_trigger) = rx.recv().await {
let dur = match parse("1s") {
Ok(d) => {
debug!("Executing Script with duration: {d:?}");
d
}
Err(e) => {
error!("Could not parse timeout: {e}");
Duration::new(0, 1)
} // TODO: Return error to server
};
// TODO: Remove hardcoded script
let script_id = "1234";
let exec_result =
exec_command("echo \"test message und so\"".into(), dur).await;
let exec_result_str = match exec_result {
Ok(s) => {
debug!("Script response:\n{s}");
s
}
Err(e) => {
error!("{e}");
format!("{e}")
}
};
let mut sys = System::new_all();
sys.refresh_all();
let os_version = sys.long_os_version().unwrap_or("".into());
Expand All @@ -133,10 +122,6 @@ async fn main() {

let messages = vec![
sink_message(&arc_sink, Message::Text(format!("id:{id}"))),
sink_message(
&arc_sink,
Message::Text(format!("script_{script_id}:{exec_result_str}")),
),
sink_message(&arc_sink, Message::Text(format!("alias:{alias}"))),
sink_message(&arc_sink, Message::Text("attributes:hello,world".into())),
sink_message(&arc_sink, Message::Text(format!("os:{os_version}"))),
Expand All @@ -159,11 +144,13 @@ async fn main() {
}
});

// all things incoming
// #####################
// ALL THE RECEIVE STUFF
// #####################
let recv_handle = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
// print message and break if instructed to do so
if process_message(msg, who, tx.clone()).is_break() {
if process_message(msg, who, tx.clone()).await.is_break() {
debug!("we are breaking!");
break;
}
Expand Down Expand Up @@ -191,10 +178,41 @@ async fn sink_message(arc: &SenderSinkArc, m: Message) -> Result<(), Error> {

/// Function to handle messages we get (with a slight twist that Frame variant is visible
/// since we are working with the underlying tungstenite library directly without axum here).
fn process_message(msg: Message, who: usize, tx: Sender<bool>) -> ControlFlow<(), ()> {
async fn process_message(msg: Message, who: usize, tx: Sender<bool>) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
debug!(">>> {} got str: {:?}", who, t);
Message::Text(raw_msg) => {
debug!(">>> got str: {:?}", raw_msg);
let (message_type, msg) = raw_msg.split_once(':').unwrap();
match message_type {
"script" => {
let script: Script = serde_json::from_str(msg).unwrap();
debug!("{:?}", script);
let timeout_duration = match parse(&script.timeout) {
Ok(d) => {
debug!("Executing Script with duration: {d:?}");
d
}
Err(e) => {
// TODO: Return error to server
warn!("Could not parse timeout: {e}, defaulting to 30s");
Duration::new(30, 0)
}
};
let exec_result =
exec_command(script.script_content, timeout_duration).await;
let _exec_result_str = match exec_result {
Ok(s) => {
debug!("Script response:\n{s}");
s
}
Err(e) => {
error!("{e}");
format!("{e}")
}
};
}
_ => warn!("Server sent unsupported message type {message_type}:\n{msg}"),
}
}
Message::Binary(d) => {
debug!(">>> {} got {} bytes: {:?}", who, d.len(), d);
Expand Down Expand Up @@ -248,9 +266,10 @@ async fn exec_command(cmd: String, timeout: Duration) -> std::io::Result<String>
))
}
};
// FIXME: clean this up
match code {
0 => match std::str::from_utf8(&output.stdout) {
Ok(s) => Ok(s.to_string()),
Ok(s) => Ok(s.strip_suffix('\n').unwrap_or(s).to_string()),
Err(e) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("stderr was not valid utf-8: {e}"),
Expand All @@ -259,7 +278,7 @@ async fn exec_command(cmd: String, timeout: Duration) -> std::io::Result<String>
124 => {
error!("Executing Process was killed by timeout ({:?})!", timeout);
let res = match std::str::from_utf8(&output.stdout) {
Ok(s) => s.to_string(),
Ok(s) => s.strip_suffix('\n').unwrap_or(s).to_string(),
Err(e) => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
Expand All @@ -275,7 +294,7 @@ async fn exec_command(cmd: String, timeout: Duration) -> std::io::Result<String>
_ => {
error!("Executing Process resulted in error code {}!", code);
match std::str::from_utf8(&output.stderr) {
Ok(s) => Ok(s.to_string()),
Ok(s) => Ok(s.strip_suffix('\n').unwrap_or(s).to_string()),
Err(e) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("stderr was not valid utf-8: {e}"),
Expand Down

0 comments on commit ddc5cbf

Please sign in to comment.