From 255843acb5da214f73a6ec6dfb04c07e80d7d856 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 16 Jul 2024 00:39:23 +0300 Subject: [PATCH] src/mqtt: restructure a bit, add timeouts This does replace a bunch of unconditional printlns with debug!, and uses match, while logging in the other cases. In case there's a connection error, add a sleep, so we don't spam the log and try reconnecting too quickly. --- src/mqtt.rs | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/src/mqtt.rs b/src/mqtt.rs index b551430..1ad400a 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,7 +1,8 @@ -use std::{sync::mpsc::Sender, thread}; +use std::{sync::mpsc::Sender, thread, time::Duration}; use fossbeamer::Command; use rumqttc::{Client, ClientError, MqttOptions, Packet, Publish}; +use tracing::{debug, warn}; pub(crate) struct Listener { pub id: String, @@ -20,20 +21,34 @@ impl Listener { thread::spawn(move || { for event in connection.iter() { - println!("{:?}", event); - - if let Ok(rumqttc::Event::Incoming(Packet::Publish(Publish { - topic, - payload, - .. - }))) = event - { - if topic == "commands" { - if let Ok(command) = serde_json::from_slice::(&payload) { - println!("{:?}", command); - - self.sender.send(command).unwrap(); + match event { + Ok(event) => match event { + rumqttc::Event::Incoming(Packet::Publish(Publish { + topic, + payload, + .. + })) => { + if topic == "commands" { + if let Ok(command) = serde_json::from_slice::(&payload) { + debug!(?command, "received command"); + + self.sender.send(command).unwrap(); + } + } else { + debug!(?topic, "received other topic"); + } + } + rumqttc::Event::Incoming(incoming) => { + debug!(?incoming, "other incoming event"); + } + rumqttc::Event::Outgoing(out) => { + debug!(?out, "outgoing event"); } + }, + Err(e) => { + warn!(err=%e, "connection error"); + // sleep a bit + std::thread::sleep(Duration::from_secs(5)); } } }