Skip to content
Michael Dombrowski edited this page Sep 28, 2023 · 6 revisions

Overview

Greengrass IPC offers a set of APIs for components to interact with Greengrass and each other. IPC is implemented using event streams over Unix domain sockets or Windows named pipes. All IPC APIs are implemented using bidirectional asynchronous streams. So even though some APIs appear to be request-response, they are still implemented as a stream, just with particular semantics enforced (ie. the stream closes after sending the response). The IPC transport layer is implemented in C and the application layer is implemented in Java, C++, Python, and NodeJS.

Clients in all languages and the server in Java (server is not currently available in any other languages) are all automatically generated based on a Smithy model of the APIs.

IPCEventStreamService implements the IPC server in Nucleus which creates the socket and server listener as well as registering each API's default implementation. It then registers the authentication handler which determines if a connection is valid. It also configures a dummy authorization handler because Greengrass implements its own authorization. Each IPC service implementation then needs to register the real operation handler which will override the default handler. For example, MqttProxyIPCService registers PublishToIoTCoreHandler and SubscribeToIoTCoreHandler.

Threading model

IPC server is implemented by the AWS CRT which using a single thread based event loop. As such, all IPC operations should be non-blocking and do any expensive work on a different thread. If an API implementation blocks the thread, then all IPC clients will have their requests delayed as the IPC event loop cannot move on from the blocking operation to pickup the new requests. Asynchronous API implementations are discussed below, and these should be used in order to not block the IPC thread.

Authentication

Authentication (often abbreviated AuthN) determines who is trying to do something. Greengrass provides a unique secret token to each component when it is started using the environment variable SVCUID. This secret token uniquely identifies an individual component and is used to authenticate the component to Greengrass if that component attempts to connect to Greengrass over IPC. When a GenericExternalService is created, it will generate this token which is then associated with the service in memory. If the token is provided to Greengrass in the IPC connect message, then Greengrass can lookup this token to identify the service which is connecting, or to reject the connection if the token is unknown.

Authorization

Authorization (often abbreviated AuthZ) determines what you're allowed to do. Combined with authN, Greengrass knows who you are and based on who you are we can lookup the policy to see what you're allowed to do. Authorization is mostly implemented in AuthorizationHandler. The authorization handler keeps a list of valid operations and IPC services, this is not strictly necessary and should be removed because it makes it impossible to add new IPC operations or services without updating the Nucleus. If a component specifies a policy that contains an unknown IPC service or operation, then IPC policy parsing will log an error and return without parsing any remaining policies, so this means that any one invalid policy may prevent valid policies in other components from being loaded.

To actually perform authorization for an API call, the API implementation must call isAuthorized which will then throw an exception if the operation is not authorized for the service.

An example IPC service

Now let's look at one real IPC service to see how it actually works. MqttProxyIPCService is the IPC service to publish and subscribe to IoT Core for components. This service is implemented as a Startable which is registered in KernelLifecycle for startup before all other services. When this service starts, all it does is to register the handlers for the two APIs that it will implement. The APIs are actually implemented by MqttProxyIPCAgent.

MqttProxyIPCAgent has two public methods which provide "operation handlers" for each IPC API. PublishToIoTCoreOperationHandler implements publishToIoTCore which is a request-response style API. All operation handlers have the same constructor where it will store the name of the service which is calling the API for authorization and logging purposes. Then, the handler provides a handleRequest implementation to actually do the work of the API. This is the synchronous variant, but there is also an asynchronous version which may be used to implement an API that may take some time to execute so that the IPC thread won't be blocked by the long operation. The async version should generally be preferred for new APIs.

The API implementation begins with translateExceptions this is a safety layer so that any and all exceptions that may be thrown will be logged and sent to the IPC caller as an IPC exception type. Then the API performs validation on the request, authorizes the request, then does the actual requested work. Finally, it returns the response object.

Now let's have a look at a subscription style API and compare to the request-response API. SubscribeToIoTCoreOperationHandler begins in the same way by storing the service name of the caller. The class also has a few new fields which will be used in the handler. For this API, the handleRequest method is empty and we're using the async variant instead. handleRequestAsync begins the same way, with translateExceptions, input validation, and authorization. All this so far is running synchronously on the IPC thread. When doing the actual work, the API returns a future which will map from the subscription success or failure result into either the success response or an IPC error. The work that this future represents will execute on the MQTT thread rather than the IPC thread, so the IPC thread is free to continue servicing other requests while this subscribe request is pending.

So far, this looks the same as request-response except that we've chosen to use the async version of handleRequest. Now, let's see how we actually send subscription messages. The MQTT subscribe request registers a callback which is forwardToSubscriber this method will call sendStreamEvent to send a subscription message to the subscriber. One important bit is here where we will refuse to send the subscription event if the initial subscribe response message is not yet sent. Due to the asynchronous nature of all of this, the callback may be called and then try to send the stream event before the initial response message is sent. This leads to an IPC error in the client because it requires that the first message received on the stream is the response message type, rather than the "streaming event" type.

In order to unsubscribe, clients must close the stream. Streams are also automatically closed when a client disconnects for any reason. The API handler can implement onStreamClosed to execute logic when the stream closes for any reason. In this example, IPC will unsubscribe from the MQTT topic so that no more messages will be sent.

PubSubIPCEventStreamAgent provides a local publish and subscribe implementation, similar to MQTT (but it definitely isn't MQTT).

When subscribing to a topic filter (a topic which includes MQTT-style wildcards # and +), the subscriber will not receive messages published from itself by default. This can be changed when subscribing by using the ReceiveMode option in the subscribe request and setting it to RECEIVE_ALL_MESSAGES.

Publishing a message to a topic has no delivery guarantee. If there are no subscribers to a topic, then the message is also considered to be successfully published. The publish request is considered finished when the task to send the message to all subscribers is entered successfully into a threadpool to be executed asynchronously. This behavior should be changed now that IPC handlers can be asynchronous. It should be changed so that the publish response is only sent after sending the message to all consumers. Better yet, the publish response could be extended to include a "delivery receipt" which would list all the components which were sent the message. Note that we only know if we successfully sent the message, we still cannot know that the subscriber didn't die immediately after getting the message.

Adding a new API

  1. Update the Smithy model with the new API
  2. Build the Smithy model to generate the new server and client code
  3. Publish the new client as a SNAPSHOT version to the internal maven repository
  4. Copy the new server code into the Nucleus code base, specifically copy software.amazon.awssdk
  5. Add authZ IPC service to operation mapping for the new operation
  6. Implement the new API and register the implementation using greengrassCoreIPCService.setOperationHandler, also register for authZ using authorizationHandler.registerComponent, for example in Shadow manager
  7. Submit PR to IoT Device SDK in all languages to include the new API
  8. Release new Nucleus version and new device SDK versions