Skip to content

Commit

Permalink
Improve HTTP and ZeroMQ adapters (#137)
Browse files Browse the repository at this point in the history
Closes #111 and #15 

Changes:
* Add synchronised start/stop mechanism to HTTP adapter
* Write suite of tests using this mechanism
* Make HTTP adapter support interrupts (it did not appear to already)
* Remove `include_json` parameter from HTTP endpoints as `aiohttp` can
work that out for itself
* Create new ZeroMQ adapter specifically for pushing. The previous one
had issues and also implemented more functionality than was needed.
Multiple ZMQ adapters for the different ZMQ socket modes (PUSH, PUBLISH,
DEALER etc.) seems like a better way to go.
* Synchronize ZeroMQ socket binding with a lock to avoid conflicts
* Write suite of tests for ZeroMQ push adapter
  • Loading branch information
callumforrester authored Jul 14, 2023
1 parent 94297cb commit 565beff
Show file tree
Hide file tree
Showing 16 changed files with 697 additions and 396 deletions.
6 changes: 3 additions & 3 deletions docs/user/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ This is the internal API reference for tickit
-------------------------------


.. automodule:: tickit.adapters.zmqadapter
.. automodule:: tickit.adapters.zeromq.push_adapter
:members:

``tickit.adapters.zmqadapter``
------------------------------
``tickit.adapters.zeromq.push_adapter``
---------------------------------------


.. automodule:: tickit.adapters.epicsadapter
Expand Down
7 changes: 7 additions & 0 deletions examples/configs/http-and-zeromq-devices.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- examples.devices.http_device.ExampleHttpDevice:
name: http-device
inputs: {}
- examples.devices.zeromq_push_device.ExampleZeroMqPusher:
name: zeromq-pusher
inputs:
updates: http-device:updates
11 changes: 2 additions & 9 deletions examples/configs/http-device.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
- tickit.devices.source.Source:
name: source
inputs: {}
value: False
- examples.devices.http_device.ExampleHTTP:
- examples.devices.http_device.ExampleHttpDevice:
name: http-device
inputs:
foo: source:value
foo: False
bar: 10
inputs: {}
81 changes: 22 additions & 59 deletions examples/devices/http_device.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,21 @@
from dataclasses import dataclass
from typing import Optional

from aiohttp import web

from tickit.adapters.httpadapter import HTTPAdapter
from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint
from tickit.adapters.httpadapter import HttpAdapter
from tickit.adapters.interpreters.endpoints.http_endpoint import HttpEndpoint
from tickit.core.components.component import Component, ComponentConfig
from tickit.core.components.device_simulation import DeviceSimulation
from tickit.core.device import Device, DeviceUpdate
from tickit.core.typedefs import SimTime
from tickit.utils.compat.typing_compat import TypedDict
from tickit.devices.iobox import IoBoxDevice


class ExampleHTTPDevice(Device):
"""A device class for an example HTTP device.
class IoBoxHttpAdapter(HttpAdapter):
"""An adapter for an IoBox that allows reads and writes via REST calls"""

...
"""
device: IoBoxDevice

Inputs: TypedDict = TypedDict("Inputs", {"foo": bool})

Outputs: TypedDict = TypedDict("Outputs", {"bar": float})

def __init__(
self,
foo: bool = False,
bar: Optional[int] = 10,
) -> None:
"""An example HTTP device constructor which configures the ... .
Args:
foo (bool): A flag to indicate something. Defauls to False.
bar (int, optional): A number to represent something. Defaults to 3600.
"""
self.foo = foo
self.bar = bar

def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]:
"""Generic update function to update the values of the ExampleHTTPDevice.
Args:
time (SimTime): The simulation time in nanoseconds.
inputs (Inputs): A TypedDict of the inputs to the ExampleHTTPDevice.
Returns:
DeviceUpdate[Outputs]:
The produced update event which contains the value of the device
variables.
"""
pass


class ExampleHTTPAdapter(HTTPAdapter):
"""An Eiger adapter which parses the commands sent to the HTTP server."""

device: ExampleHTTPDevice

@HTTPEndpoint.put("/command/foo/")
async def foo(self, request: web.Request) -> web.Response:
@HttpEndpoint.put("/memory/{address}", interrupt=True)
async def write_to_address(self, request: web.Request) -> web.Response:
"""A HTTP endpoint for sending a command to the example HTTP device.
Args:
Expand All @@ -66,10 +24,13 @@ async def foo(self, request: web.Request) -> web.Response:
Returns:
web.Response: [description]
"""
return web.Response(text=str("put data"))
address = request.match_info["address"]
new_value = (await request.json())["value"]
self.device.write(address, new_value)
return web.json_response({address: new_value})

@HTTPEndpoint.get("/info/bar/{data}")
async def bar(self, request: web.Request) -> web.Response:
@HttpEndpoint.get("/memory/{address}")
async def read_from_address(self, request: web.Request) -> web.Response:
"""A HTTP endpoint for requesting data from the example HTTP device.
Args:
Expand All @@ -78,19 +39,21 @@ async def bar(self, request: web.Request) -> web.Response:
Returns:
web.Response: [description]
"""
return web.Response(text=f"Your data: {request.match_info['data']}")
address = request.match_info["address"]
value = self.device.read(address)
return web.json_response({address: value})


@dataclass
class ExampleHTTP(ComponentConfig):
class ExampleHttpDevice(ComponentConfig):
"""Example HTTP device."""

foo: bool = False
bar: Optional[int] = 10
host: str = "localhost"
port: int = 8080

def __call__(self) -> Component: # noqa: D102
return DeviceSimulation(
name=self.name,
device=ExampleHTTPDevice(foo=self.foo, bar=self.bar),
adapters=[ExampleHTTPAdapter()],
device=IoBoxDevice(),
adapters=[IoBoxHttpAdapter(self.host, self.port)],
)
49 changes: 49 additions & 0 deletions examples/devices/zeromq_push_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from dataclasses import dataclass, field
from typing import Optional, Set

from tickit.adapters.zeromq.push_adapter import (
SocketFactory,
ZeroMqPushAdapter,
create_zmq_push_socket,
)
from tickit.core.components.component import Component, ComponentConfig
from tickit.core.components.device_simulation import DeviceSimulation
from tickit.devices.iobox import IoBoxDevice


class IoBoxZeroMqAdapter(ZeroMqPushAdapter):
"""An Eiger adapter which parses the commands sent to the HTTP server."""

device: IoBoxDevice[str, int]
_addresses_to_publish: Set[str]

def __init__(
self,
host: str = "127.0.0.1",
port: int = 5555,
socket_factory: Optional[SocketFactory] = create_zmq_push_socket,
addresses_to_publish: Optional[Set[str]] = None,
) -> None:
super().__init__(host, port, socket_factory)
self._addresses_to_publish = addresses_to_publish or set()

def after_update(self):
for address in self._addresses_to_publish:
value = self.device.read(address)
self.send_message([{address: value}])


@dataclass
class ExampleZeroMqPusher(ComponentConfig):
"""Device that can publish writes to its memory over a zeromq socket."""

host: str = "127.0.0.1"
port: int = 5555
addresses_to_publish: Set[str] = field(default_factory=lambda: {"foo", "bar"})

def __call__(self) -> Component: # noqa: D102
return DeviceSimulation(
name=self.name,
device=IoBoxDevice(),
adapters=[IoBoxZeroMqAdapter()],
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"aiozmq",
"apischema==0.16.1",
"immutables",
"pydantic",
"pyyaml",
"pyzmq",
"softioc",
Expand Down
66 changes: 54 additions & 12 deletions src/tickit/adapters/httpadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
import logging
from dataclasses import dataclass
from inspect import getmembers
from typing import Iterable
from typing import Awaitable, Callable, Iterable, Optional

from aiohttp import web
from aiohttp.web_routedef import RouteDef

from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint
from tickit.adapters.interpreters.endpoints.http_endpoint import HttpEndpoint
from tickit.core.adapter import Adapter, RaiseInterrupt
from tickit.core.device import Device

LOGGER = logging.getLogger(__name__)


@dataclass
class HTTPAdapter(Adapter):
class HttpAdapter(Adapter):
"""An adapter implementation which delegates to a server and sets up endpoints.
An adapter implementation which delegates the hosting of an http requests to a
Expand All @@ -25,29 +25,56 @@ class HTTPAdapter(Adapter):
host: str = "localhost"
port: int = 8080

_stopped: Optional[asyncio.Event] = None
_ready: Optional[asyncio.Event] = None

async def run_forever(
self, device: Device, raise_interrupt: RaiseInterrupt
) -> None:
"""Runs the server continuously."""
await super().run_forever(device, raise_interrupt)

self._ensure_stopped_event().clear()
await self._start_server()

self._ensure_ready_event().set()
try:
await asyncio.Event().wait()
finally:
# TODO: This doesn't work yet due to asyncio's own exception handler
await self._ensure_stopped_event().wait()
except asyncio.CancelledError:
await self.stop()

async def wait_until_ready(self, timeout: float = 1.0) -> None:
while self._ready is None:
await asyncio.sleep(0.1)
await asyncio.wait_for(self._ready.wait(), timeout=timeout)

async def stop(self) -> None:
stopped = self._ensure_stopped_event()
if not stopped.is_set():
await self.site.stop()
await self.app.shutdown()
await self.app.cleanup()
self._ensure_stopped_event().set()
if self._ready is not None:
self._ready.clear()

def _ensure_stopped_event(self) -> asyncio.Event:
if self._stopped is None:
self._stopped = asyncio.Event()
return self._stopped

def _ensure_ready_event(self) -> asyncio.Event:
if self._ready is None:
self._ready = asyncio.Event()
return self._ready

async def _start_server(self):
LOGGER.debug(f"Starting HTTP server... {self}")
self.app = web.Application()
self.app.add_routes(list(self.endpoints()))
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, host=self.host, port=self.port)
await site.start()
self.site = web.TCPSite(runner, host=self.host, port=self.port)
await self.site.start()

def endpoints(self) -> Iterable[RouteDef]:
"""Returns list of endpoints.
Expand All @@ -56,12 +83,27 @@ def endpoints(self) -> Iterable[RouteDef]:
then yields them.
Returns:
Iterable[HTTPEndpoint]: The list of defined endpoints
Iterable[HttpEndpoint]: The list of defined endpoints
Yields:
Iterator[Iterable[HTTPEndpoint]]: The iterator of the defined endpoints
Iterator[Iterable[HttpEndpoint]]: The iterator of the defined endpoints
"""
for _, func in getmembers(self):
endpoint = getattr(func, "__endpoint__", None) # type: ignore
if endpoint is not None and isinstance(endpoint, HTTPEndpoint):
if endpoint is not None and isinstance(endpoint, HttpEndpoint):
if endpoint.interrupt:
func = _with_posthoc_task(func, self.raise_interrupt)
yield endpoint.define(func)


def _with_posthoc_task(
func: Callable[[web.Request], Awaitable[web.Response]],
afterwards: Callable[[], Awaitable[None]],
) -> Callable[[web.Request], Awaitable[web.Response]]:
# @functools.wraps
async def wrapped(request: web.Request) -> web.Response:
response = await func(request)
await afterwards()
return response

return wrapped
Loading

0 comments on commit 565beff

Please sign in to comment.