From ad189004b0d4f4b7df184a5ce5e03c4655ab00f7 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 24 Jul 2024 14:15:12 +0200 Subject: [PATCH] Removing results from Publication API --- CHANGELOG.md | 8 +- src/lib.rs | 3 +- src/mqtt_client.rs | 7 +- src/packets.rs | 15 ++++ src/publication.rs | 119 +++++++++++----------------- tests/at_least_once_subscription.rs | 8 +- tests/deferred_payload.rs | 6 +- tests/exactly_once_subscription.rs | 8 +- tests/exactly_once_transmission.rs | 8 +- tests/integration_test.rs | 19 ++--- tests/poll_result.rs | 8 +- tests/reconnection.rs | 8 +- 12 files changed, 84 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99b6130..ea4e94a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ This document describes the changes to Minimq between releases. +# Unreleased + +## Changed +* The `Publication::finish()` API was removed in favor of a new `Publication::respond()` API for +constructing replies to previously received messages. + # [0.9.0] - 2024-04-29 ## Fixed @@ -146,7 +152,7 @@ keep-alive interval * Initial library release and publish to crates.io -[0.8.0]: https://github.com/quartiq/minimq/releases/tag/0.9.0 +[0.9.0]: https://github.com/quartiq/minimq/releases/tag/0.9.0 [0.8.0]: https://github.com/quartiq/minimq/releases/tag/0.8.0 [0.7.0]: https://github.com/quartiq/minimq/releases/tag/0.7.0 [0.6.2]: https://github.com/quartiq/minimq/releases/tag/0.6.2 diff --git a/src/lib.rs b/src/lib.rs index d2b53dd..f3daa8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,8 +52,7 @@ //! match topic { //! "topic" => { //! println!("{:?}", message); -//! let response = Publication::new(message).topic("echo").finish().unwrap(); -//! client.publish(response).unwrap(); +//! client.publish(Publication::new("echo", message)).unwrap(); //! }, //! topic => println!("Unknown topic: {}", topic), //! }; diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index e768eaa..6d1a1b7 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -2,6 +2,7 @@ use crate::{ de::{received_packet::ReceivedPacket, PacketReader}, network_manager::InterfaceHolder, packets::{ConnAck, Connect, PingReq, Pub, PubAck, PubComp, PubRec, PubRel, SubAck, Subscribe}, + publication::Publication, reason_codes::ReasonCode, session_state::SessionState, types::{Auth, Properties, TopicFilter, Utf8String}, @@ -374,17 +375,19 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate: /// If the client is not yet connected to the broker, the message will be silently ignored. /// /// # Args - /// * `publish` - The publication to generate. See [crate::Publication] for a builder pattern + /// * `publish` - The publication to generate. /// to generate a message. pub fn publish( &mut self, - mut publish: Pub<'_, P>, + publish: Publication<'_, P>, ) -> Result<(), crate::PubError> { // If we are not yet connected to the broker, we can't transmit a message. if !self.is_connected() { return Ok(()); } + let mut publish: Pub<'_, P> = publish.into(); + if let Some(max) = &self.sm.context().max_qos { if self.downgrade_qos && publish.qos > *max { publish.qos = *max diff --git a/src/packets.rs b/src/packets.rs index 4e52dc5..abc9f55 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,4 +1,5 @@ use crate::{ + publication::Publication, reason_codes::ReasonCode, types::{Auth, Properties, TopicFilter, Utf8String}, will::SerializedWill, @@ -129,6 +130,20 @@ impl<'a, P> core::fmt::Debug for Pub<'a, P> { } } +impl<'a, P> From> for Pub<'a, P> { + fn from(publication: Publication<'a, P>) -> Self { + Self { + topic: Utf8String(publication.topic), + properties: publication.properties, + packet_id: None, + payload: publication.payload, + retain: publication.retain, + qos: publication.qos, + dup: false, + } + } +} + /// An MQTT SUBSCRIBE control packet #[derive(Debug, Serialize)] pub struct Subscribe<'a> { diff --git a/src/publication.rs b/src/publication.rs index 3e88ef4..20b2f62 100644 --- a/src/publication.rs +++ b/src/publication.rs @@ -1,7 +1,6 @@ use crate::{ - packets::Pub, properties::Property, - types::{BinaryData, Properties, Utf8String}, + types::{BinaryData, Properties}, ProtocolError, QoS, Retain, }; @@ -49,8 +48,15 @@ pub struct DeferredPublication { } impl Result> DeferredPublication { - pub fn new<'a>(func: F) -> Publication<'a, Self> { - Publication::new(Self { func }) + pub fn new<'a>(topic: &'a str, func: F) -> Publication<'a, Self> { + Publication::new(topic, Self { func }) + } + + pub fn respond<'a>( + received_properties: &'a Properties<'a>, + func: F, + ) -> Result, ProtocolError> { + Publication::respond(received_properties, Self { func }) } } @@ -74,20 +80,48 @@ impl Result> ToPayload for DeferredPublicat /// in [Publication::topic], or by parsing a topic from the [Property::ResponseTopic] property /// contained within received properties by using the [Publication::reply] API. pub struct Publication<'a, P> { - topic: Option<&'a str>, - properties: Properties<'a>, - qos: QoS, - payload: P, - retain: Retain, + pub(crate) topic: &'a str, + pub(crate) properties: Properties<'a>, + pub(crate) qos: QoS, + pub(crate) payload: P, + pub(crate) retain: Retain, } impl<'a, P: ToPayload> Publication<'a, P> { + /// Generate the publication as a reply to some other received message. + /// + /// # Note + /// The received message properties are parsed for both [Property::CorrelationData] and + /// [Property::ResponseTopic]. + /// + /// * If correlation data is found, it is automatically appended to the + /// publication properties. + /// + /// * If a response topic is identified, the message topic will be + /// configured for it, which will override any previously-specified topic. + pub fn respond( + received_properties: &'a Properties<'a>, + payload: P, + ) -> Result { + let Some(response_topic) = received_properties.into_iter().find_map(|p| { + if let Ok(Property::ResponseTopic(topic)) = p { + Some(topic.0) + } else { + None + } + }) else { + return Err(ProtocolError::NoTopic); + }; + + Ok(Self::new(response_topic, payload)) + } + /// Construct a new publication with a payload. - pub fn new(payload: P) -> Self { + pub fn new(topic: &'a str, payload: P) -> Self { Self { payload, qos: QoS::AtMostOnce, - topic: None, + topic, properties: Properties::Slice(&[]), retain: Retain::NotRetained, } @@ -105,16 +139,6 @@ impl<'a, P: ToPayload> Publication<'a, P> { self } - /// Specify the publication topic for this message. - /// - /// # Note - /// If this is called after [Publication::reply] determines a response topic, the response - /// topic will be overridden. - pub fn topic(mut self, topic: &'a str) -> Self { - self.topic.replace(topic); - self - } - /// Specify properties associated with this publication. pub fn properties(mut self, properties: &'a [Property<'a>]) -> Self { self.properties = match self.properties { @@ -128,42 +152,6 @@ impl<'a, P: ToPayload> Publication<'a, P> { self } - /// Generate the publication as a reply to some other received message. - /// - /// # Note - /// The received message properties are parsed for both [Property::CorrelationData] and - /// [Property::ResponseTopic]. - /// - /// * If correlation data is found, it is automatically appended to the - /// publication properties. - /// - /// * If a response topic is identified, the message topic will be - /// configured for it, which will override any previously-specified topic. - pub fn reply(mut self, received_properties: &'a Properties<'a>) -> Self { - if let Some(response_topic) = received_properties.into_iter().find_map(|p| { - if let Ok(Property::ResponseTopic(topic)) = p { - Some(topic.0) - } else { - None - } - }) { - self.topic.replace(response_topic); - } - - // Next, copy over any correlation data to the outbound properties. - if let Some(correlation_data) = received_properties.into_iter().find_map(|p| { - if let Ok(Property::CorrelationData(data)) = p { - Some(data.0) - } else { - None - } - }) { - self.correlate(correlation_data) - } else { - self - } - } - /// Include correlation data to the message /// /// # Note @@ -184,21 +172,4 @@ impl<'a, P: ToPayload> Publication<'a, P> { self } - - /// Generate the final publication. - /// - /// # Returns - /// The message to be published if a publication topic was specified. If no publication topic - /// was identified, an error is returned. - pub fn finish(self) -> Result, ProtocolError> { - Ok(Pub { - topic: Utf8String(self.topic.ok_or(ProtocolError::NoTopic)?), - properties: self.properties, - packet_id: None, - payload: self.payload, - retain: self.retain, - qos: self.qos, - dup: false, - }) - } } diff --git a/tests/at_least_once_subscription.rs b/tests/at_least_once_subscription.rs index 382f2b7..dbbba2b 100644 --- a/tests/at_least_once_subscription.rs +++ b/tests/at_least_once_subscription.rs @@ -49,13 +49,7 @@ fn main() -> std::io::Result<()> { if !published && mqtt.client().can_publish(QoS::AtLeastOnce) { mqtt.client() - .publish( - Publication::new("Ping") - .topic("data") - .qos(QoS::AtLeastOnce) - .finish() - .unwrap(), - ) + .publish(Publication::new("data", "Ping").qos(QoS::AtLeastOnce)) .unwrap(); log::info!("Publishing message"); published = true; diff --git a/tests/deferred_payload.rs b/tests/deferred_payload.rs index c9394d9..fd4e90d 100644 --- a/tests/deferred_payload.rs +++ b/tests/deferred_payload.rs @@ -23,11 +23,7 @@ fn main() -> std::io::Result<()> { assert!(matches!( mqtt.client().publish( - DeferredPublication::new(|_buf| { Err("Oops!") }) - .topic("data") - .qos(QoS::ExactlyOnce) - .finish() - .unwrap(), + DeferredPublication::new("data", |_buf| { Err("Oops!") }).qos(QoS::ExactlyOnce) ), Err(minimq::PubError::Serialization("Oops!")) )); diff --git a/tests/exactly_once_subscription.rs b/tests/exactly_once_subscription.rs index a046dad..fbd7c98 100644 --- a/tests/exactly_once_subscription.rs +++ b/tests/exactly_once_subscription.rs @@ -50,13 +50,7 @@ fn main() -> std::io::Result<()> { if !published && mqtt.client().can_publish(QoS::ExactlyOnce) { mqtt.client() - .publish( - Publication::new("Ping") - .topic("data") - .qos(QoS::ExactlyOnce) - .finish() - .unwrap(), - ) + .publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce)) .unwrap(); log::info!("Publishing message"); published = true; diff --git a/tests/exactly_once_transmission.rs b/tests/exactly_once_transmission.rs index f0c57c7..bf10f09 100644 --- a/tests/exactly_once_transmission.rs +++ b/tests/exactly_once_transmission.rs @@ -29,13 +29,7 @@ fn main() -> std::io::Result<()> { if mqtt.client().is_connected() && !published && mqtt.client().can_publish(QoS::ExactlyOnce) { mqtt.client() - .publish( - Publication::new("Ping".as_bytes()) - .topic("data") - .qos(QoS::ExactlyOnce) - .finish() - .unwrap(), - ) + .publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce)) .unwrap(); log::info!("Publishing message"); published = true; diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 1de1e77..98a6c1f 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -31,10 +31,7 @@ fn main() -> std::io::Result<()> { .poll(|client, topic, payload, properties| { log::info!("{} < {}", topic, core::str::from_utf8(payload).unwrap()); - if let Ok(response) = Publication::new("Pong".as_bytes()) - .reply(properties) - .finish() - { + if let Ok(response) = Publication::respond(properties, b"Pong") { client.publish(response).unwrap(); } @@ -63,20 +60,14 @@ fn main() -> std::io::Result<()> { } else if !client.subscriptions_pending() && !published { println!("PUBLISH request"); let properties = [Property::ResponseTopic(Utf8String("response"))]; - let publication = Publication::new(b"Ping") - .topic("request") - .properties(&properties) - .finish() - .unwrap(); + let publication = Publication::new("request", b"Ping").properties(&properties); client.publish(publication).unwrap(); - let publication = Publication::new(b"Ping") - .topic("request") + let publication = Publication::new("request", b"Ping") .properties(&properties) - .qos(QoS::AtLeastOnce) - .finish() - .unwrap(); + .qos(QoS::AtLeastOnce); + client.publish(publication).unwrap(); // The message cannot be ack'd until the next poll call diff --git a/tests/poll_result.rs b/tests/poll_result.rs index 772284a..c90f00a 100644 --- a/tests/poll_result.rs +++ b/tests/poll_result.rs @@ -45,13 +45,7 @@ fn main() -> std::io::Result<()> { && mqtt.client().can_publish(QoS::ExactlyOnce) { mqtt.client() - .publish( - Publication::new("Ping".as_bytes()) - .topic("data") - .qos(QoS::ExactlyOnce) - .finish() - .unwrap(), - ) + .publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce)) .unwrap(); log::info!("Publishing message"); published = true; diff --git a/tests/reconnection.rs b/tests/reconnection.rs index e281388..844338a 100644 --- a/tests/reconnection.rs +++ b/tests/reconnection.rs @@ -40,13 +40,7 @@ fn main() -> std::io::Result<()> { // 2. Send a QoS::AtLeastOnce message mqtt.client() - .publish( - Publication::new("Ping".as_bytes()) - .topic("test") - .qos(QoS::ExactlyOnce) - .finish() - .unwrap(), - ) + .publish(Publication::new("test", b"Ping").qos(QoS::ExactlyOnce)) .unwrap(); // Force a disconnect from the broker.