-
Notifications
You must be signed in to change notification settings - Fork 47
MQTT
MQTT is a critical subsystem for Greengrass. MQTT is how Nucleus communicates with the cloud service and is notified about deployments. MQTT is also used of course by customer components, but that is less critical in terms of reliability than the internal usage.
AWS IoT Core has various limits which apply to all users of MQTT, including Nucleus. Most notably, IoT Core limits each connection to 50 subscriptions. This adds a good deal of complication to the Nucleus MQTT implementation because we want to handle this limitation invisibly by spawning new MQTT connections as needed, and scaling down when not needed.
All requests to publish MQTT messages go through a Spooler
which has two backend implementations; either in memory or on disk. The Spooler
exists to handle network disconnections in order to queue messages while offline and then send them when back online, as well as to retry sending messages on failures.
MQTTClient
is the interface for all MQTT operations in Nucleus; publish
, subscribe
, and unsubscribe
.
MQTTClient
has two client implementations, one for MQTT 3 and one for MQTT 5. MQTTClient
also has multiple implementations for publish
, subscribe
, and unsubscribe
due to changes needed for MQTT 5 support while maintaining backward compatibility. The versions of these APIs which return futures should be preferred.
When an MQTT client is needed to perform some action, it will call getNewMqttClient
which will create the appropriate MQTT client based on the customer's selected MQTT version. MQTT 5 is the default as of 2.10.0. Important to note that all MQTT connections must have a unique client ID. When creating a new MQTT client, Nucleus will find the lowest unused client ID number which is then appended to the client ID, such as <thingName>#2
.
In order to publish, the message is first validated, then added into the spooler and then the spooler is triggered manually.
triggerSpooler
will create or re-create (as needed) the runSpooler
task.
runSpooler
will get an MQTT connection and request that it connects, then wait for it to be connected. This is done because the MQTT 3 client may fail to connect and will not retry automatically. The MQTT 5 client does automatically retry, so this code is not harmful but isn't strictly necessary either. Once connected, runSpooler
then looks across all active MQTT connections to find the client with the minimal wait time for publishing. IoT Core limits each connection to 512KB/s or 100TPS whichever comes first, so our MQTT client respects these limits itself to avoid being throttled. If multiple connections are available, then the MQTT client can publish using all of those clients and thus improve the throughput (ex. 2 clients, so we can do 200TPS of publishing). A message is then published and will be retried or removed from the spooler after the publish attempt completes.
Because of the 50 subscription limit, MQTT client needs to be careful to track what subscriptions exist and in which connection they exist.
First, we validate the topic name conforms to AWS IoT Core requirements. Then, we check to see if any existing subscriptions include new one (such as /abc/#
would include /abc/def
). If that's the case, then the new subscription is stored and associated with the connection. If not, then we get a new MQTT connection which has room for an additional subscription and add this subscription to that connection. Then using that connection, we call subscribe. If there was an error when subscribing, we remove the subscription from our map and the error is re-thrown to the original caller.
Within the Nucleus, services can register callbacks for when the MQTT client initially connects, disconnects, and resumes a connection. The MQTTClient
itself also registers callbacks which tracks the current state of the MQTT connection and controls the spooler publishing execution. The CallbackEventManager
will deduplicate events in the case that there are multiple MQTT clients (so we'd expect 2 or more connection interrupted events when the network goes down). It deduplicates by using an AtomicBoolean
tracking if it needs to trigger the callbacks because the MQTT connection state has changed from the last time it called the callbacks.
Importantly, if MQTTClient
requests that the MQTT client disconnects (because we no longer need it, or we need to reconfigure it), then we will skip calling the disconnect callback. If that isn't skipped, then this happens.
Services should utilize these callbacks to be more efficient with retries. For example, don't keep trying to subscribe to a topic if we know that we're offline. And conversely, start trying to subscribe again when the connection comes back up.
MQTTClient
connects to AWS IoT Core which uses TLS mutual authentication to authenticate the connection and then applies various IoT policies based on the certificate used. Greengrass provides a minimal set of policies here. Some important notes for this is that AWS IoT Policy variables may not work as expected when Greengrass creates multiple connection. Specifically, policy variables relating to the IoT Thing only work when the MQTT client ID exactly matches the thing name, and this will only work for the first connection Greengrass makes. Additional connections to handle more than 50 subscriptions will not work with these policy variables. As an alternative, use certificate based policy variables such as the common name. If Greengrass does not have permission to connect, then it will fail to connect and will retry forever to get a connection. If Greengrass does not have permission to subscribe or publish to a specific topic, then the behavior will depend on which MQTT version is being used. For MQTT 3, IoT Core will disconnect the client for any unauthorized action. For MQTT 5, IoT Core should provide a proper acknowledgement packet that says authorization was denied. When publishing with QoS 0 though, IoT Core may still disconnect the client since QoS 0 publishes have no PUBACK response.