-
Notifications
You must be signed in to change notification settings - Fork 47
IPC
Greengrass IPC offers a set of APIs for components to interact with Greengrass and each other. IPC is implemented using event streams over Unix domain sockets or Windows named pipes. All IPC APIs are implemented using bidirectional asynchronous streams. So even though some APIs appear to be request-response, they are still implemented as a stream, just with particular semantics enforced (ie. the stream closes after sending the response). The IPC transport layer is implemented in C and the application layer is implemented in Java, C++, Python, and NodeJS.
Clients in all languages and the server in Java (server is not currently available in any other languages) are all automatically generated based on a Smithy model of the APIs.
IPCEventStreamService
implements the IPC server in Nucleus which creates the socket and server listener as well as registering each API's default implementation. It then registers the authentication handler which determines if a connection is valid. It also configures a dummy authorization handler because Greengrass implements its own authorization. Each IPC service implementation then needs to register the real operation handler which will override the default handler. For example, MqttProxyIPCService
registers PublishToIoTCoreHandler
and SubscribeToIoTCoreHandler
.
IPC server is implemented by the AWS CRT which using a single thread based event loop. As such, all IPC operations should be non-blocking and do any expensive work on a different thread. If an API implementation blocks the thread, then all IPC clients will have their requests delayed as the IPC event loop cannot move on from the blocking operation to pickup the new requests. Asynchronous API implementations are discussed below, and these should be used in order to not block the IPC thread.
Authentication (often abbreviated AuthN) determines who is trying to do something. Greengrass provides a unique secret token to each component when it is started using the environment variable SVCUID
. This secret token uniquely identifies an individual component and is used to authenticate the component to Greengrass if that component attempts to connect to Greengrass over IPC. When a GenericExternalService
is created, it will generate this token which is then associated with the service in memory. If the token is provided to Greengrass in the IPC connect message, then Greengrass can lookup this token to identify the service which is connecting, or to reject the connection if the token is unknown.
Authorization (often abbreviated AuthZ) determines what you're allowed to do. Combined with authN, Greengrass knows who you are and based on who you are we can lookup the policy to see what you're allowed to do. Authorization is mostly implemented in AuthorizationHandler
. The authorization handler keeps a list of valid operations and IPC services, this is not strictly necessary and should be removed because it makes it impossible to add new IPC operations or services without updating the Nucleus. If a component specifies a policy that contains an unknown IPC service or operation, then IPC policy parsing will log an error and return without parsing any remaining policies, so this means that any one invalid policy may prevent valid policies in other components from being loaded.
To actually perform authorization for an API call, the API implementation must call isAuthorized
which will then throw an exception if the operation is not authorized for the service.
Now let's look at one real IPC service to see how it actually works. MqttProxyIPCService
is the IPC service to publish and subscribe to IoT Core for components. This service is implemented as a Startable
which is registered in KernelLifecycle
for startup before all other services. When this service starts, all it does is to register the handlers for the two APIs that it will implement. The APIs are actually implemented by MqttProxyIPCAgent
.
MqttProxyIPCAgent
has two public methods which provide "operation handlers" for each IPC API. PublishToIoTCoreOperationHandler
implements publishToIoTCore
which is a request-response style API. All operation handlers have the same constructor where it will store the name of the service which is calling the API for authorization and logging purposes. Then, the handler provides a handleRequest
implementation to actually do the work of the API. This is the synchronous variant, but there is also an asynchronous version which may be used to implement an API that may take some time to execute so that the IPC thread won't be blocked by the long operation. The async version should generally be preferred for new APIs.
The API implementation begins with translateExceptions
this is a safety layer so that any and all exceptions that may be thrown will be logged and sent to the IPC caller as an IPC exception type. Then the API performs validation on the request, authorizes the request, then does the actual requested work. Finally, it returns the response object.
Now let's have a look at a subscription style API and compare to the request-response API. SubscribeToIoTCoreOperationHandler
begins in the same way by storing the service name of the caller. The class also has a few new fields which will be used in the handler. For this API, the handleRequest
method is empty and we're using the async variant instead. handleRequestAsync
begins the same way, with translateExceptions
, input validation, and authorization. All this so far is running synchronously on the IPC thread. When doing the actual work, the API returns a future which will map from the subscription success or failure result into either the success response or an IPC error. The work that this future represents will execute on the MQTT thread rather than the IPC thread, so the IPC thread is free to continue servicing other requests while this subscribe request is pending.
So far, this looks the same as request-response except that we've chosen to use the async version of handleRequest
. Now, let's see how we actually send subscription messages. The MQTT subscribe request registers a callback which is forwardToSubscriber
this method will call sendStreamEvent
to send a subscription message to the subscriber. One important bit is here where we will refuse to send the subscription event if the initial subscribe response message is not yet sent. Due to the asynchronous nature of all of this, the callback may be called and then try to send the stream event before the initial response message is sent. This leads to an IPC error in the client because it requires that the first message received on the stream is the response message type, rather than the "streaming event" type.
In order to unsubscribe, clients must close the stream. Streams are also automatically closed when a client disconnects for any reason. The API handler can implement onStreamClosed
to execute logic when the stream closes for any reason. In this example, IPC will unsubscribe from the MQTT topic so that no more messages will be sent.