Skip to content

Commit

Permalink
Merge pull request #126 from quartiq/rs/issue-125/tcp-disconnection
Browse files Browse the repository at this point in the history
Fixing MQTT permanent disconnection issues
  • Loading branch information
ryan-summers authored Jun 22, 2023
2 parents d620537 + 1a9720a commit 072c8f3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 40 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ This document describes the changes to Minimq between releases.

# [Unreleased]

## Fixed
* [breaking] Embedded-nal version updated to 0.7
* Fixed an issue where the MQTT client would become permanently inoperable when the broker
disconnected under certain conditions.

# [0.6.2] - 2023-04-05

## Fixed
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ embedded-time = "0.12"
varint-rs = {version = "2.2", default-features = false }
serde = { version = "1", features = ["derive"], default-features = false }
smlang = "0.6.0"

[dependencies.embedded-nal]
version = "0.6"
embedded-nal = "0.7"

[features]
default = []
Expand All @@ -36,3 +34,7 @@ log = "0.4"
env_logger = "0.7"
std-embedded-nal = "0.1"
std-embedded-time = "0.1"

[patch.crates-io.std-embedded-nal]
git = "https://gitlab.com/ryan-summers/std-embedded-nal"
branch = "feature/0.7"
30 changes: 14 additions & 16 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::{
session_state::SessionState,
types::{Properties, TopicFilter, Utf8String},
will::Will,
Error, Property, ProtocolError, QoS, Retain, MQTT_INSECURE_DEFAULT_PORT, {debug, error, info},
Error, Property, ProtocolError, QoS, Retain, MQTT_INSECURE_DEFAULT_PORT,
{debug, error, info, warn},
};

use embedded_nal::{IpAddr, SocketAddr, TcpClientStack};
Expand All @@ -22,7 +23,7 @@ mod sm {

statemachine! {
transitions: {
*Disconnected + Reallocated = Restart,
*Disconnected + TcpConnected = Restart,
Restart + SentConnect = Establishing,
Establishing + Connected(ConnAck<'a>) [ handle_connack ] = Active,

Expand All @@ -32,8 +33,7 @@ mod sm {
Active + ControlPacket(ReceivedPacket<'a>) [handle_packet] = Active,
Active + SentSubscribe(u16) [handle_subscription] = Active,

Establishing + TcpDisconnect = Disconnected,
Active + TcpDisconnect = Disconnected,
_ + TcpDisconnect = Disconnected
},
custom_guard_error: true,
}
Expand Down Expand Up @@ -378,10 +378,6 @@ impl<
}

fn handle_restart(&mut self) -> Result<(), Error<TcpStack::Error>> {
if !self.network.tcp_connected()? {
return Ok(());
}

let properties = [
// Tell the broker our maximum packet size.
Property::MaximumPacketSize(MSG_SIZE as u32),
Expand All @@ -405,6 +401,7 @@ impl<

fn handle_active(&mut self) -> Result<(), Error<TcpStack::Error>> {
if self.sm.context_mut().session_state.ping_is_overdue()? {
warn!("Ping overdue. Trigging send timeout reset");
self.sm.process_event(Events::SendTimeout).unwrap();
}

Expand Down Expand Up @@ -438,21 +435,21 @@ impl<
}

fn update(&mut self) -> Result<(), Error<TcpStack::Error>> {
// Potentially update the state machine depending on the current socket connection status.
let tcp_connected = self.network.tcp_connected()?;

if !tcp_connected {
self.sm.process_event(Events::TcpDisconnect).ok();
if self.network.socket_was_closed() {
info!("Handling closed socket");
self.sm.process_event(Events::TcpDisconnect).unwrap();
self.network.allocate_socket()?;
}

// Attempt to finish any pending packets.
self.network.finish_write()?;

match *self.sm.state() {
States::Disconnected => {
self.network.allocate_socket()?;
self.network.connect(self.broker)?;
self.sm.process_event(Events::Reallocated)?;
if self.network.connect(self.broker)? {
info!("TCP socket connected");
self.sm.process_event(Events::TcpConnected)?;
}
}
States::Restart => self.handle_restart()?,
States::Active => self.handle_active()?,
Expand Down Expand Up @@ -663,6 +660,7 @@ impl<
let buffer = match self.packet_reader.receive_buffer() {
Ok(buffer) => buffer,
Err(e) => {
warn!("Protocol Error reset: {e:?}");
self.client.sm.process_event(Events::ProtocolError).unwrap();
self.packet_reader.reset();
return Err(Error::Protocol(e));
Expand Down
52 changes: 31 additions & 21 deletions src/network_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! stack to be used to transmit buffers that may be stored internally in other structs without
//! violating Rust's borrow rules.
use crate::message_types::ControlPacket;
use embedded_nal::{nb, SocketAddr, TcpClientStack};
use embedded_nal::{nb, SocketAddr, TcpClientStack, TcpError};
use heapless::Vec;
use serde::Serialize;

Expand All @@ -17,6 +17,7 @@ pub(crate) struct InterfaceHolder<TcpStack: TcpClientStack, const MSG_SIZE: usiz
socket: Option<TcpStack::TcpSocket>,
network_stack: TcpStack,
pending_write: Option<Vec<u8, MSG_SIZE>>,
connection_died: bool,
}

impl<TcpStack, const MSG_SIZE: usize> InterfaceHolder<TcpStack, MSG_SIZE>
Expand All @@ -29,26 +30,24 @@ where
socket: None,
network_stack: stack,
pending_write: None,
connection_died: false,
}
}

pub fn socket_was_closed(&mut self) -> bool {
let was_closed = self.connection_died;
if was_closed {
self.pending_write.take();
}
self.connection_died = false;
was_closed
}

/// Determine if there is a pending packet write that needs to be completed.
pub fn has_pending_write(&self) -> bool {
self.pending_write.is_some()
}

/// Determine if an TCP connection exists and is connected.
pub fn tcp_connected(&mut self) -> Result<bool, Error<TcpStack::Error>> {
if self.socket.is_none() {
return Ok(false);
}

let socket = self.socket.as_ref().unwrap();
self.network_stack
.is_connected(socket)
.map_err(Error::Network)
}

/// Allocate a new TCP socket.
///
/// # Note
Expand All @@ -69,18 +68,21 @@ where
///
/// # Args
/// * `remote` - The address of the remote to connect to.
pub fn connect(&mut self, remote: SocketAddr) -> Result<(), Error<TcpStack::Error>> {
let socket = self.socket.as_mut().ok_or(Error::NotReady)?;
pub fn connect(&mut self, remote: SocketAddr) -> Result<bool, Error<TcpStack::Error>> {
if self.socket.is_none() {
self.allocate_socket()?;
}

let socket = self.socket.as_mut().unwrap();

// Drop any pending unfinished packets, as we're establishing a new connection.
self.pending_write.take();

self.network_stack
.connect(socket, remote)
.map_err(|err| match err {
nb::Error::WouldBlock => Error::WriteFail,
nb::Error::Other(err) => Error::Network(err),
})
match self.network_stack.connect(socket, remote) {
Ok(_) => Ok(true),
Err(nb::Error::WouldBlock) => Ok(false),
Err(nb::Error::Other(err)) => Err(Error::Network(err)),
}
}

/// Write data to the interface.
Expand All @@ -97,6 +99,10 @@ where
.send(socket, data)
.or_else(|err| match err {
nb::Error::WouldBlock => Ok(0),
nb::Error::Other(err) if err.kind() == embedded_nal::TcpErrorKind::PipeClosed => {
self.connection_died = true;
Ok(0)
}
nb::Error::Other(err) => Err(Error::Network(err)),
})
.map(|written| {
Expand Down Expand Up @@ -156,6 +162,10 @@ where

result.or_else(|err| match err {
nb::Error::WouldBlock => Ok(0),
nb::Error::Other(err) if err.kind() == embedded_nal::TcpErrorKind::PipeClosed => {
self.connection_died = true;
Ok(0)
}
nb::Error::Other(err) => Err(Error::Network(err)),
})
}
Expand Down

0 comments on commit 072c8f3

Please sign in to comment.