Skip to content

Commit

Permalink
Merge pull request #66 from quartiq/feature/will-support
Browse files Browse the repository at this point in the history
Adding support for will messages
  • Loading branch information
jordens authored Nov 9, 2021
2 parents 6139234 + 6d2fccf commit d52102b
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 35 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ This document describes the changes to Minimq between releases.

# Unreleased

## Added
* Support for the `Will` message specification.
* [breaking] Adding `retained` flag to `publish()` to allow messages to be published in a retained
manner.

# Version 0.4.0
Version 0.4.0 was published on 2021-10-08

Expand Down
30 changes: 27 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! temperature sensor installed.
//!
//! ```no_run
//! use minimq::{Minimq, QoS};
//! use minimq::{Minimq, QoS, Retain};
//!
//! // Construct an MQTT client with a maximum packet size of 256 bytes.
//! // Connect to a broker at localhost - Use a client ID of "test".
Expand All @@ -51,7 +51,7 @@
//! match topic {
//! "topic" => {
//! println!("{:?}", message);
//! client.publish("echo", message, QoS::AtMostOnce, &[]).unwrap();
//! client.publish("echo", message, QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap();
//! },
//! topic => println!("Unknown topic: {}", topic),
//! };
Expand All @@ -67,17 +67,41 @@ mod mqtt_client;
mod network_manager;
mod properties;
mod session_state;
mod will;

use message_types::MessageType;
pub use properties::Property;

pub use embedded_nal;
pub use embedded_time;
pub use mqtt_client::{Minimq, QoS};
pub use mqtt_client::Minimq;

#[cfg(feature = "logging")]
pub(crate) use log::{debug, error, info, warn};

/// The quality-of-service for an MQTT message.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum QoS {
/// A packet will be delivered at most once, but may not be delivered at all.
AtMostOnce = 0,

/// A packet will be delivered at least one time, but possibly more than once.
AtLeastOnce = 1,

/// A packet will be delivered exactly one time.
ExactlyOnce = 2,
}

/// The retained status for an MQTT message.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum Retain {
/// The message shall not be retained by the broker.
NotRetained = 0,

/// The message shall be marked for retention by the broker.
Retained = 1,
}

/// Errors that are specific to the MQTT protocol implementation.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ProtocolError {
Expand Down
47 changes: 32 additions & 15 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::{
network_manager::InterfaceHolder,
ser::serialize,
session_state::SessionState,
Error, Property, ProtocolError, {debug, error, info},
will::Will,
Error, Property, ProtocolError, QoS, Retain, {debug, error, info},
};

use embedded_nal::{IpAddr, SocketAddr, TcpClientStack};
Expand All @@ -15,19 +16,6 @@ use heapless::String;

use core::str::FromStr;

/// The quality-of-service for an MQTT message.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum QoS {
/// A packet will be delivered at most once, but may not be delivered at all.
AtMostOnce = 0,

/// A packet will be delivered at least one time, but possibly more than once.
AtLeastOnce = 1,

/// A packet will be delivered exactly one time.
ExactlyOnce = 2,
}

mod sm {

use smlang::statemachine;
Expand Down Expand Up @@ -72,6 +60,7 @@ pub struct MqttClient<
clock: Clock,
session_state: SessionState<Clock, MSG_SIZE, MSG_COUNT>,
connection_state: StateMachine<Context>,
will: Option<Will<MSG_SIZE>>,
}

impl<TcpStack, Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
Expand Down Expand Up @@ -121,6 +110,7 @@ where
&properties,
// Only perform a clean start if we do not have any session state.
!self.session_state.is_present(),
self.will.as_ref(),
)?;

info!("Sending CONNECT");
Expand All @@ -141,6 +131,30 @@ where
Ok(())
}

/// Specify the Will message to be sent if the client disconnects.
///
/// # Args
/// * `topic` - The topic to send the message on
/// * `data` - The message to transmit
/// * `qos` - The quality of service at which to send the message.
/// * `retained` - Specifies whether the will message should be retained by the broker.
/// * `properties` - Any properties to send with the will message.
pub fn set_will(
&mut self,
topic: &str,
data: &[u8],
qos: QoS,
retained: Retain,
properties: &[Property],
) -> Result<(), Error<TcpStack::Error>> {
let mut will = Will::new(topic, data, properties)?;
will.retained(retained);
will.qos(qos);

self.will.replace(will);
Ok(())
}

fn reset(&mut self) {
self.connection_state.process_event(Events::Disconnect).ok();
}
Expand Down Expand Up @@ -254,6 +268,7 @@ where
topic: &str,
data: &[u8],
qos: QoS,
retain: Retain,
properties: &[Property],
) -> Result<(), Error<TcpStack::Error>> {
// TODO: QoS 2 support.
Expand All @@ -273,7 +288,8 @@ where
let id = self.session_state.get_packet_identifier();

let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];
let packet = serialize::publish_message(&mut buffer, topic, data, qos, id, properties)?;
let packet =
serialize::publish_message(&mut buffer, topic, data, qos, retain, id, properties)?;

self.network.write(packet)?;
self.session_state.increment_packet_identifier();
Expand Down Expand Up @@ -484,6 +500,7 @@ impl<
clock,
session_state,
connection_state: StateMachine::new(Context),
will: None,
},
packet_reader: PacketReader::new(),
};
Expand Down
1 change: 0 additions & 1 deletion src/ser/packet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ impl<'a> ReversedPacketWriter<'a> {
}
}

#[cfg(test)]
pub fn finish(self) -> &'a [u8] {
&self.buffer[self.index..]
}
Expand Down
95 changes: 83 additions & 12 deletions src/ser/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::QoS;
use crate::{
message_types::MessageType, properties::PropertyIdentifier, ser::ReversedPacketWriter,
Property, ProtocolError as Error,
will::Will, Property, ProtocolError as Error, QoS, Retain,
};

use bit_field::BitField;
Expand All @@ -20,13 +19,14 @@ pub fn integer_size(value: usize) -> usize {
}
}

pub fn connect_message<'a, 'b>(
dest: &'b mut [u8],
pub fn connect_message<'a, const S: usize>(
dest: &'a mut [u8],
client_id: &[u8],
keep_alive: u16,
properties: &[Property<'a>],
properties: &[Property],
clean_start: bool,
) -> Result<&'b [u8], Error> {
will: Option<&Will<S>>,
) -> Result<&'a [u8], Error> {
// Validate the properties for this packet.
for property in properties {
match property.id() {
Expand All @@ -49,7 +49,18 @@ pub fn connect_message<'a, 'b>(

let mut packet = ReversedPacketWriter::new(dest);

// Write the payload, which is the client ID.
if let Some(will) = will {
// Serialize the will data into the packet
packet.write(&will.payload)?;

// Update the flags for the will parameters. Indicate that the will is present, the QoS of
// the will message, and whether or not the will message should be retained.
flags.set_bit(2, true);
flags.set_bits(3..=4, will.qos as u8);
flags.set_bit(5, will.retain == Retain::Retained);
}

// Write the the client ID payload.
packet.write_binary_data(client_id)?;

// Write the variable header.
Expand All @@ -72,6 +83,7 @@ pub fn publish_message<'a, 'b, 'c>(
topic: &'a str,
payload: &[u8],
qos: QoS,
retain: Retain,
id: u16,
properties: &[Property<'c>],
) -> Result<&'b [u8], Error> {
Expand Down Expand Up @@ -106,7 +118,9 @@ pub fn publish_message<'a, 'b, 'c>(
packet.write_utf8_string(topic)?;

// Set qos to fixed header bits 1 and 2 in binary
let flags = *0u8.set_bits(1..=2, qos as u8);
let flags = *0u8
.set_bits(1..=2, qos as u8)
.set_bit(0, retain == Retain::Retained);

// Write the fixed length header.
packet.finalize(MessageType::Publish, flags)
Expand Down Expand Up @@ -154,7 +168,16 @@ pub fn serialize_publish() {

let mut buffer: [u8; 900] = [0; 900];
let payload: [u8; 2] = [0xAB, 0xCD];
let message = publish_message(&mut buffer, "ABC", &payload, QoS::AtMostOnce, 0, &[]).unwrap();
let message = publish_message(
&mut buffer,
"ABC",
&payload,
QoS::AtMostOnce,
Retain::NotRetained,
0,
&[],
)
.unwrap();

assert_eq!(message, good_publish);
}
Expand All @@ -172,8 +195,16 @@ pub fn serialize_publish_qos1() {

let mut buffer: [u8; 900] = [0; 900];
let payload: [u8; 2] = [0xAB, 0xCD];
let message =
publish_message(&mut buffer, "ABC", &payload, QoS::AtLeastOnce, 0xbeef, &[]).unwrap();
let message = publish_message(
&mut buffer,
"ABC",
&payload,
QoS::AtLeastOnce,
Retain::NotRetained,
0xbeef,
&[],
)
.unwrap();

assert_eq!(message, good_publish);
}
Expand Down Expand Up @@ -213,6 +244,7 @@ pub fn serialize_publish_with_properties() {
"ABC",
&payload,
QoS::AtMostOnce,
Retain::NotRetained,
0,
&[Property::ResponseTopic("A")],
)
Expand All @@ -235,7 +267,46 @@ fn serialize_connect() {

let mut buffer: [u8; 900] = [0; 900];
let client_id = "ABC".as_bytes();
let message = connect_message(&mut buffer, client_id, 10, &[], true).unwrap();
let message = connect_message::<100>(&mut buffer, client_id, 10, &[], true, None).unwrap();

assert_eq!(message, good_serialized_connect)
}

#[test]
fn serialize_connect_with_will() {
#[rustfmt::skip]
let good_serialized_connect: [u8; 28] = [
0x10, // Connect
26, // Remaining length

// Header: "MQTT5"
0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05,

// Flags: Clean start, will present, will not retained, will QoS = 0,
0b0000_0110,

// Keep-alive: 10 seconds
0x00, 0x0a,

// Connected Properties: None
0x00,
// Client ID: "ABC"
0x00, 0x03, 0x41, 0x42, 0x43,
// Will properties: None
0x00,
// Will topic: "EFG"
0x00, 0x03, 0x45, 0x46, 0x47,
// Will payload: [0xAB, 0xCD]
0x00, 0x02, 0xAB, 0xCD,
];

let mut buffer: [u8; 900] = [0; 900];
let client_id = "ABC".as_bytes();
let mut will = Will::<100>::new("EFG", &[0xAB, 0xCD], &[]).unwrap();
will.qos(QoS::AtMostOnce);
will.retained(Retain::NotRetained);

let message = connect_message(&mut buffer, client_id, 10, &[], true, Some(&will)).unwrap();

assert_eq!(message, good_serialized_connect)
}
Expand Down
Loading

0 comments on commit d52102b

Please sign in to comment.