Skip to content

Commit

Permalink
Merge pull request #164 from quartiq/issue/159/publication-simplifica…
Browse files Browse the repository at this point in the history
…tion

Removing results from Publication API
  • Loading branch information
jordens authored Jul 24, 2024
2 parents ce94255 + ad18900 commit cff0f57
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 133 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

This document describes the changes to Minimq between releases.

# Unreleased

## Changed
* The `Publication::finish()` API was removed in favor of a new `Publication::respond()` API for
constructing replies to previously received messages.

# [0.9.0] - 2024-04-29

## Fixed
Expand Down Expand Up @@ -146,7 +152,7 @@ keep-alive interval

* Initial library release and publish to crates.io

[0.8.0]: https://github.com/quartiq/minimq/releases/tag/0.9.0
[0.9.0]: https://github.com/quartiq/minimq/releases/tag/0.9.0
[0.8.0]: https://github.com/quartiq/minimq/releases/tag/0.8.0
[0.7.0]: https://github.com/quartiq/minimq/releases/tag/0.7.0
[0.6.2]: https://github.com/quartiq/minimq/releases/tag/0.6.2
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
//! match topic {
//! "topic" => {
//! println!("{:?}", message);
//! let response = Publication::new(message).topic("echo").finish().unwrap();
//! client.publish(response).unwrap();
//! client.publish(Publication::new("echo", message)).unwrap();
//! },
//! topic => println!("Unknown topic: {}", topic),
//! };
Expand Down
7 changes: 5 additions & 2 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
de::{received_packet::ReceivedPacket, PacketReader},
network_manager::InterfaceHolder,
packets::{ConnAck, Connect, PingReq, Pub, PubAck, PubComp, PubRec, PubRel, SubAck, Subscribe},
publication::Publication,
reason_codes::ReasonCode,
session_state::SessionState,
types::{Auth, Properties, TopicFilter, Utf8String},
Expand Down Expand Up @@ -374,17 +375,19 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate:
/// If the client is not yet connected to the broker, the message will be silently ignored.
///
/// # Args
/// * `publish` - The publication to generate. See [crate::Publication] for a builder pattern
/// * `publish` - The publication to generate.
/// to generate a message.
pub fn publish<P: crate::publication::ToPayload>(
&mut self,
mut publish: Pub<'_, P>,
publish: Publication<'_, P>,
) -> Result<(), crate::PubError<TcpStack::Error, P::Error>> {
// If we are not yet connected to the broker, we can't transmit a message.
if !self.is_connected() {
return Ok(());
}

let mut publish: Pub<'_, P> = publish.into();

if let Some(max) = &self.sm.context().max_qos {
if self.downgrade_qos && publish.qos > *max {
publish.qos = *max
Expand Down
15 changes: 15 additions & 0 deletions src/packets.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
publication::Publication,
reason_codes::ReasonCode,
types::{Auth, Properties, TopicFilter, Utf8String},
will::SerializedWill,
Expand Down Expand Up @@ -129,6 +130,20 @@ impl<'a, P> core::fmt::Debug for Pub<'a, P> {
}
}

impl<'a, P> From<Publication<'a, P>> for Pub<'a, P> {
fn from(publication: Publication<'a, P>) -> Self {
Self {
topic: Utf8String(publication.topic),
properties: publication.properties,
packet_id: None,
payload: publication.payload,
retain: publication.retain,
qos: publication.qos,
dup: false,
}
}
}

/// An MQTT SUBSCRIBE control packet
#[derive(Debug, Serialize)]
pub struct Subscribe<'a> {
Expand Down
119 changes: 45 additions & 74 deletions src/publication.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
packets::Pub,
properties::Property,
types::{BinaryData, Properties, Utf8String},
types::{BinaryData, Properties},
ProtocolError, QoS, Retain,
};

Expand Down Expand Up @@ -49,8 +48,15 @@ pub struct DeferredPublication<F> {
}

impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> DeferredPublication<F> {
pub fn new<'a>(func: F) -> Publication<'a, Self> {
Publication::new(Self { func })
pub fn new<'a>(topic: &'a str, func: F) -> Publication<'a, Self> {
Publication::new(topic, Self { func })
}

pub fn respond<'a>(
received_properties: &'a Properties<'a>,
func: F,
) -> Result<Publication<'a, Self>, ProtocolError> {
Publication::respond(received_properties, Self { func })
}
}

Expand All @@ -74,20 +80,48 @@ impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublicat
/// in [Publication::topic], or by parsing a topic from the [Property::ResponseTopic] property
/// contained within received properties by using the [Publication::reply] API.
pub struct Publication<'a, P> {
topic: Option<&'a str>,
properties: Properties<'a>,
qos: QoS,
payload: P,
retain: Retain,
pub(crate) topic: &'a str,
pub(crate) properties: Properties<'a>,
pub(crate) qos: QoS,
pub(crate) payload: P,
pub(crate) retain: Retain,
}

impl<'a, P: ToPayload> Publication<'a, P> {
/// Generate the publication as a reply to some other received message.
///
/// # Note
/// The received message properties are parsed for both [Property::CorrelationData] and
/// [Property::ResponseTopic].
///
/// * If correlation data is found, it is automatically appended to the
/// publication properties.
///
/// * If a response topic is identified, the message topic will be
/// configured for it, which will override any previously-specified topic.
pub fn respond(
received_properties: &'a Properties<'a>,
payload: P,
) -> Result<Self, ProtocolError> {
let Some(response_topic) = received_properties.into_iter().find_map(|p| {
if let Ok(Property::ResponseTopic(topic)) = p {
Some(topic.0)
} else {
None
}
}) else {
return Err(ProtocolError::NoTopic);
};

Ok(Self::new(response_topic, payload))
}

/// Construct a new publication with a payload.
pub fn new(payload: P) -> Self {
pub fn new(topic: &'a str, payload: P) -> Self {
Self {
payload,
qos: QoS::AtMostOnce,
topic: None,
topic,
properties: Properties::Slice(&[]),
retain: Retain::NotRetained,
}
Expand All @@ -105,16 +139,6 @@ impl<'a, P: ToPayload> Publication<'a, P> {
self
}

/// Specify the publication topic for this message.
///
/// # Note
/// If this is called after [Publication::reply] determines a response topic, the response
/// topic will be overridden.
pub fn topic(mut self, topic: &'a str) -> Self {
self.topic.replace(topic);
self
}

/// Specify properties associated with this publication.
pub fn properties(mut self, properties: &'a [Property<'a>]) -> Self {
self.properties = match self.properties {
Expand All @@ -128,42 +152,6 @@ impl<'a, P: ToPayload> Publication<'a, P> {
self
}

/// Generate the publication as a reply to some other received message.
///
/// # Note
/// The received message properties are parsed for both [Property::CorrelationData] and
/// [Property::ResponseTopic].
///
/// * If correlation data is found, it is automatically appended to the
/// publication properties.
///
/// * If a response topic is identified, the message topic will be
/// configured for it, which will override any previously-specified topic.
pub fn reply(mut self, received_properties: &'a Properties<'a>) -> Self {
if let Some(response_topic) = received_properties.into_iter().find_map(|p| {
if let Ok(Property::ResponseTopic(topic)) = p {
Some(topic.0)
} else {
None
}
}) {
self.topic.replace(response_topic);
}

// Next, copy over any correlation data to the outbound properties.
if let Some(correlation_data) = received_properties.into_iter().find_map(|p| {
if let Ok(Property::CorrelationData(data)) = p {
Some(data.0)
} else {
None
}
}) {
self.correlate(correlation_data)
} else {
self
}
}

/// Include correlation data to the message
///
/// # Note
Expand All @@ -184,21 +172,4 @@ impl<'a, P: ToPayload> Publication<'a, P> {

self
}

/// Generate the final publication.
///
/// # Returns
/// The message to be published if a publication topic was specified. If no publication topic
/// was identified, an error is returned.
pub fn finish(self) -> Result<Pub<'a, P>, ProtocolError> {
Ok(Pub {
topic: Utf8String(self.topic.ok_or(ProtocolError::NoTopic)?),
properties: self.properties,
packet_id: None,
payload: self.payload,
retain: self.retain,
qos: self.qos,
dup: false,
})
}
}
8 changes: 1 addition & 7 deletions tests/at_least_once_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ fn main() -> std::io::Result<()> {

if !published && mqtt.client().can_publish(QoS::AtLeastOnce) {
mqtt.client()
.publish(
Publication::new("Ping")
.topic("data")
.qos(QoS::AtLeastOnce)
.finish()
.unwrap(),
)
.publish(Publication::new("data", "Ping").qos(QoS::AtLeastOnce))
.unwrap();
log::info!("Publishing message");
published = true;
Expand Down
6 changes: 1 addition & 5 deletions tests/deferred_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ fn main() -> std::io::Result<()> {

assert!(matches!(
mqtt.client().publish(
DeferredPublication::new(|_buf| { Err("Oops!") })
.topic("data")
.qos(QoS::ExactlyOnce)
.finish()
.unwrap(),
DeferredPublication::new("data", |_buf| { Err("Oops!") }).qos(QoS::ExactlyOnce)
),
Err(minimq::PubError::Serialization("Oops!"))
));
Expand Down
8 changes: 1 addition & 7 deletions tests/exactly_once_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@ fn main() -> std::io::Result<()> {

if !published && mqtt.client().can_publish(QoS::ExactlyOnce) {
mqtt.client()
.publish(
Publication::new("Ping")
.topic("data")
.qos(QoS::ExactlyOnce)
.finish()
.unwrap(),
)
.publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce))
.unwrap();
log::info!("Publishing message");
published = true;
Expand Down
8 changes: 1 addition & 7 deletions tests/exactly_once_transmission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ fn main() -> std::io::Result<()> {
if mqtt.client().is_connected() && !published && mqtt.client().can_publish(QoS::ExactlyOnce)
{
mqtt.client()
.publish(
Publication::new("Ping".as_bytes())
.topic("data")
.qos(QoS::ExactlyOnce)
.finish()
.unwrap(),
)
.publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce))
.unwrap();
log::info!("Publishing message");
published = true;
Expand Down
19 changes: 5 additions & 14 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ fn main() -> std::io::Result<()> {
.poll(|client, topic, payload, properties| {
log::info!("{} < {}", topic, core::str::from_utf8(payload).unwrap());

if let Ok(response) = Publication::new("Pong".as_bytes())
.reply(properties)
.finish()
{
if let Ok(response) = Publication::respond(properties, b"Pong") {
client.publish(response).unwrap();
}

Expand Down Expand Up @@ -63,20 +60,14 @@ fn main() -> std::io::Result<()> {
} else if !client.subscriptions_pending() && !published {
println!("PUBLISH request");
let properties = [Property::ResponseTopic(Utf8String("response"))];
let publication = Publication::new(b"Ping")
.topic("request")
.properties(&properties)
.finish()
.unwrap();
let publication = Publication::new("request", b"Ping").properties(&properties);

client.publish(publication).unwrap();

let publication = Publication::new(b"Ping")
.topic("request")
let publication = Publication::new("request", b"Ping")
.properties(&properties)
.qos(QoS::AtLeastOnce)
.finish()
.unwrap();
.qos(QoS::AtLeastOnce);

client.publish(publication).unwrap();

// The message cannot be ack'd until the next poll call
Expand Down
8 changes: 1 addition & 7 deletions tests/poll_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,7 @@ fn main() -> std::io::Result<()> {
&& mqtt.client().can_publish(QoS::ExactlyOnce)
{
mqtt.client()
.publish(
Publication::new("Ping".as_bytes())
.topic("data")
.qos(QoS::ExactlyOnce)
.finish()
.unwrap(),
)
.publish(Publication::new("data", b"Ping").qos(QoS::ExactlyOnce))
.unwrap();
log::info!("Publishing message");
published = true;
Expand Down
8 changes: 1 addition & 7 deletions tests/reconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ fn main() -> std::io::Result<()> {

// 2. Send a QoS::AtLeastOnce message
mqtt.client()
.publish(
Publication::new("Ping".as_bytes())
.topic("test")
.qos(QoS::ExactlyOnce)
.finish()
.unwrap(),
)
.publish(Publication::new("test", b"Ping").qos(QoS::ExactlyOnce))
.unwrap();

// Force a disconnect from the broker.
Expand Down

0 comments on commit cff0f57

Please sign in to comment.