diff --git a/.pylintrc b/.pylintrc index f84bb969..bf65d1ef 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,7 +1,7 @@ [DESIGN] max-args=8 max-attributes=12 -max-parents=8 +max-parents=9 [MESSAGES CONTROL] disable=too-few-public-methods diff --git a/local/variables/package.yaml b/local/variables/package.yaml index 7c03dde2..98c5444b 100644 --- a/local/variables/package.yaml +++ b/local/variables/package.yaml @@ -1,5 +1,5 @@ --- major: 2 -minor: 2 +minor: 3 patch: 0 entry: runtimepy diff --git a/runtimepy/data/factories.yaml b/runtimepy/data/factories.yaml new file mode 100644 index 00000000..6cd94f54 --- /dev/null +++ b/runtimepy/data/factories.yaml @@ -0,0 +1,15 @@ +--- +factories: + # Connection factories. + - {name: runtimepy.net.factories.TcpJson} + - {name: runtimepy.net.factories.UdpJson} + - {name: runtimepy.net.factories.WebsocketJson} + + # Task factories. + - {name: runtimepy.net.arbiter.housekeeping.ConnectionMetricsLoggerFactory} + +ports: + # Reserve ports for JSON listeners. + - {name: udp_json, type: udp} + - {name: tcp_json, type: tcp} + - {name: websocket_json, type: tcp} diff --git a/runtimepy/net/arbiter/tcp.py b/runtimepy/net/arbiter/tcp/__init__.py similarity index 100% rename from runtimepy/net/arbiter/tcp.py rename to runtimepy/net/arbiter/tcp/__init__.py diff --git a/runtimepy/net/arbiter/tcp/json.py b/runtimepy/net/arbiter/tcp/json.py new file mode 100644 index 00000000..e2b6eebc --- /dev/null +++ b/runtimepy/net/arbiter/tcp/json.py @@ -0,0 +1,25 @@ +""" +A module implementing JSON-message connection factories. +""" + +# internal +from runtimepy.net.stream import UdpPrefixedMessageConnection +from runtimepy.net.stream.json import JsonMessageConnection +from runtimepy.net.tcp.connection import TcpConnection +from runtimepy.net.websocket import WebsocketConnection + + +class TcpJsonMessageConnection(JsonMessageConnection, TcpConnection): + """A TCP connection interface for JSON messaging.""" + + +class UdpJsonMessageConnection( + JsonMessageConnection, UdpPrefixedMessageConnection +): + """A UDP connection interface for JSON messaging.""" + + +class WebsocketJsonMessageConnection( + JsonMessageConnection, WebsocketConnection +): + """A websocket connection interface for JSON messaging.""" diff --git a/runtimepy/net/factories/__init__.py b/runtimepy/net/factories/__init__.py index 0bfc41a7..9f0d74e5 100644 --- a/runtimepy/net/factories/__init__.py +++ b/runtimepy/net/factories/__init__.py @@ -4,6 +4,11 @@ # internal from runtimepy.net.arbiter.tcp import TcpConnectionFactory +from runtimepy.net.arbiter.tcp.json import ( + TcpJsonMessageConnection, + UdpJsonMessageConnection, + WebsocketJsonMessageConnection, +) from runtimepy.net.arbiter.udp import UdpConnectionFactory from runtimepy.net.arbiter.websocket import WebsocketConnectionFactory from runtimepy.net.stream import ( @@ -73,6 +78,12 @@ class UdpNull(UdpConnectionFactory[NullUdpConnection]): kind = NullUdpConnection +class UdpJson(UdpConnectionFactory[UdpJsonMessageConnection]): + """UDP JSON-connection factory.""" + + kind = UdpJsonMessageConnection + + class TcpEcho(TcpConnectionFactory[EchoTcpConnection]): """TCP echo-connection factory.""" @@ -97,6 +108,12 @@ class TcpNull(TcpConnectionFactory[NullTcpConnection]): kind = NullTcpConnection +class TcpJson(TcpConnectionFactory[TcpJsonMessageConnection]): + """TCP JSON-connection factory.""" + + kind = TcpJsonMessageConnection + + class WebsocketEcho(WebsocketConnectionFactory[EchoWebsocketConnection]): """WebSocket echo-connection factory.""" @@ -107,3 +124,11 @@ class WebsocketNull(WebsocketConnectionFactory[NullWebsocketConnection]): """WebSocket null-connection factory.""" kind = NullWebsocketConnection + + +class WebsocketJson( + WebsocketConnectionFactory[WebsocketJsonMessageConnection] +): + """WebSocket JSON-connection factory.""" + + kind = WebsocketJsonMessageConnection diff --git a/runtimepy/net/stream/__init__.py b/runtimepy/net/stream/__init__.py index 58a19469..1c4bd215 100644 --- a/runtimepy/net/stream/__init__.py +++ b/runtimepy/net/stream/__init__.py @@ -1,114 +1,29 @@ """ -A module implementing a base, stream-oriented connection interface. +A module aggregating stream-oriented connection interfaces. """ # built-in -from io import BytesIO as _BytesIO from typing import BinaryIO as _BinaryIO -from typing import Tuple, Type - -# third-party -from vcorelib.io import ByteFifo +from typing import Tuple # internal from runtimepy.net.connection import BinaryMessage -from runtimepy.net.connection import Connection as _Connection +from runtimepy.net.stream.base import PrefixedMessageConnection +from runtimepy.net.stream.string import StringMessageConnection from runtimepy.net.tcp.connection import TcpConnection from runtimepy.net.udp.connection import UdpConnection -from runtimepy.primitives import Uint32, UnsignedInt - - -class PrefixedMessageConnection(_Connection): - """ - A connection for handling inter-frame message size prefixes for some - stream-oriented protocols. - """ - - message_length_kind: Type[UnsignedInt] = Uint32 - reading_header: bool - - def init(self) -> None: - """Initialize this instance.""" - - # Header parsing. - self.buffer = ByteFifo() - self.reading_header = True - self.message_length_in = self.message_length_kind() - self.prefix_size = self.message_length_in.size - - self.message_length_out = self.message_length_kind() - - def _send_message( - self, data: BinaryMessage, addr: Tuple[str, int] = None - ) -> None: - """Underlying data send.""" - - del addr - self.send_binary(data) - - def send_message( - self, data: BinaryMessage, addr: Tuple[str, int] = None - ) -> None: - """Handle inter-message prefixes for outgoing messages.""" - - self.message_length_out.value = len(data) - with _BytesIO() as stream: - self.message_length_out.to_stream( - stream, byte_order=self.byte_order - ) - stream.write(data) - self._send_message(stream.getvalue(), addr=addr) - - def send_message_str( - self, data: str, addr: Tuple[str, int] = None - ) -> None: - """Convert a message to bytes before sending.""" - self.send_message(data.encode(), addr=addr) - - async def process_single( - self, stream: _BinaryIO, addr: Tuple[str, int] = None - ) -> bool: - """Process a single message.""" - del stream - del addr - return True - - async def process_binary( - self, data: bytes, addr: Tuple[str, int] = None - ) -> bool: - """Process an incoming message.""" - - result = True - - self.buffer.ingest(data) - - can_continue = True - while can_continue: - # Read the message size. - if self.reading_header: - size = self.buffer.pop(self.prefix_size) - if size is not None: - assert len(size) == self.prefix_size - self.message_length_in.update( - size, byte_order=self.byte_order - ) - self.reading_header = False - else: - can_continue = False - - # Read the message payload. - else: - message = self.buffer.pop(self.message_length_in.value) - if message is not None: - # process message - with _BytesIO(message) as stream: - result &= await self.process_single(stream, addr=addr) - self.reading_header = True - else: - can_continue = False - - return result +__all__ = [ + "PrefixedMessageConnection", + "StringMessageConnection", + "TcpPrefixedMessageConnection", + "UdpPrefixedMessageConnection", + "EchoMessageConnection", + "EchoTcpMessageConnection", + "EchoUdpMessageConnection", + "TcpStringMessageConnection", + "UdpStringMessageConnection", +] class TcpPrefixedMessageConnection(PrefixedMessageConnection, TcpConnection): @@ -157,26 +72,6 @@ class EchoUdpMessageConnection( """A connection that just echoes what it was sent.""" -class StringMessageConnection(PrefixedMessageConnection): - """A simple string-message sending and processing connection.""" - - async def process_message( - self, data: str, addr: Tuple[str, int] = None - ) -> bool: - """Process a string message.""" - - del addr - self.logger.info(data) - return True - - async def process_single( - self, stream: _BinaryIO, addr: Tuple[str, int] = None - ) -> bool: - """Process a single message.""" - - return await self.process_message(stream.read().decode(), addr=addr) - - class TcpStringMessageConnection(StringMessageConnection, TcpConnection): """A simple string-message sending and processing connection using TCP.""" diff --git a/runtimepy/net/stream/base.py b/runtimepy/net/stream/base.py new file mode 100644 index 00000000..c3c00bc9 --- /dev/null +++ b/runtimepy/net/stream/base.py @@ -0,0 +1,109 @@ +""" +A module implementing a base, stream-oriented connection interface. +""" + +# built-in +from io import BytesIO as _BytesIO +from typing import BinaryIO as _BinaryIO +from typing import Tuple, Type + +# third-party +from vcorelib.io import ByteFifo + +# internal +from runtimepy.net.connection import BinaryMessage +from runtimepy.net.connection import Connection as _Connection +from runtimepy.primitives import Uint32, UnsignedInt + + +class PrefixedMessageConnection(_Connection): + """ + A connection for handling inter-frame message size prefixes for some + stream-oriented protocols. + """ + + message_length_kind: Type[UnsignedInt] = Uint32 + reading_header: bool + + def init(self) -> None: + """Initialize this instance.""" + + # Header parsing. + self.buffer = ByteFifo() + self.reading_header = True + self.message_length_in = self.message_length_kind() + self.prefix_size = self.message_length_in.size + + self.message_length_out = self.message_length_kind() + + def _send_message( + self, data: BinaryMessage, addr: Tuple[str, int] = None + ) -> None: + """Underlying data send.""" + + del addr + self.send_binary(data) + + def send_message( + self, data: BinaryMessage, addr: Tuple[str, int] = None + ) -> None: + """Handle inter-message prefixes for outgoing messages.""" + + self.message_length_out.value = len(data) + + with _BytesIO() as stream: + self.message_length_out.to_stream( + stream, byte_order=self.byte_order + ) + stream.write(data) + self._send_message(stream.getvalue(), addr=addr) + + def send_message_str( + self, data: str, addr: Tuple[str, int] = None + ) -> None: + """Convert a message to bytes before sending.""" + self.send_message(data.encode(), addr=addr) + + async def process_single( + self, stream: _BinaryIO, addr: Tuple[str, int] = None + ) -> bool: + """Process a single message.""" + del stream + del addr + return True + + async def process_binary( + self, data: bytes, addr: Tuple[str, int] = None + ) -> bool: + """Process an incoming message.""" + + result = True + + self.buffer.ingest(data) + + can_continue = True + while can_continue: + # Read the message size. + if self.reading_header: + size = self.buffer.pop(self.prefix_size) + if size is not None: + assert len(size) == self.prefix_size + self.message_length_in.update( + size, byte_order=self.byte_order + ) + self.reading_header = False + else: + can_continue = False + + # Read the message payload. + else: + message = self.buffer.pop(self.message_length_in.value) + if message is not None: + # process message + with _BytesIO(message) as stream: + result &= await self.process_single(stream, addr=addr) + self.reading_header = True + else: + can_continue = False + + return result diff --git a/runtimepy/net/stream/json.py b/runtimepy/net/stream/json.py new file mode 100644 index 00000000..4b06e68f --- /dev/null +++ b/runtimepy/net/stream/json.py @@ -0,0 +1,274 @@ +""" +A module implementing a JSON message connection interface. +""" + +import asyncio + +# built-in +from copy import copy +from json import JSONDecodeError, dumps, loads +from typing import Any, Awaitable, Callable, Dict, Tuple, Type, TypeVar, Union + +# third-party +from vcorelib.dict.codec import JsonCodec + +# internal +from runtimepy.net.stream.string import StringMessageConnection +from runtimepy.net.udp import UdpConnection + +JsonMessage = Dict[str, Any] + +# +# def message_handler(response: JsonMessage, data: JsonMessage) -> None: +# """A sample message handler.""" +# +MessageHandler = Callable[[JsonMessage, JsonMessage], Awaitable[None]] +MessageHandlers = Dict[str, MessageHandler] +RESERVED_KEYS = {"keys_ignored", "__id__"} + +# +# def message_handler(response: JsonMessage, data: JsonCodec) -> None: +# """A sample message handler.""" +# +T = TypeVar("T", bound=JsonCodec) +TypedHandler = Callable[[JsonMessage, T], Awaitable[None]] + +DEFAULT_LOOPBACK = {"a": 1, "b": 2, "c": 3} +DEFAULT_TIMEOUT = 3 + + +async def loopback_handler(outbox: JsonMessage, inbox: JsonMessage) -> None: + """A simple loopback handler.""" + + outbox.update(inbox) + + +async def event_wait(event: asyncio.Event, timeout: float) -> bool: + """Wait for an event to be set within a timeout.""" + + result = True + + try: + await asyncio.wait_for(event.wait(), timeout) + except asyncio.TimeoutError: + result = False + + return result + + +class JsonMessageConnection(StringMessageConnection): + """A connection interface for JSON messaging.""" + + def _register_handlers(self) -> None: + """Register connection-specific command handlers.""" + + def init(self) -> None: + """Initialize this instance.""" + + super().init() + + self.handlers: MessageHandlers = {} + self.typed_handlers: Dict[ + str, Tuple[Type[JsonCodec], TypedHandler[Any]] + ] = {} + + self.curr_id: int = 1 + + self.ids_waiting: Dict[int, asyncio.Event] = {} + self.id_responses: Dict[int, JsonMessage] = {} + + # Standard handlers. + self.basic_handler("loopback") + + self._register_handlers() + + def _validate_key(self, key: str) -> str: + """Validate a handler key.""" + + assert self._valid_new_key(key), key + return key + + def _valid_new_key(self, key: str) -> bool: + """Determine if a key is valid.""" + + return ( + key not in self.handlers + and key not in self.typed_handlers + and key not in RESERVED_KEYS + ) + + def basic_handler( + self, key: str, handler: MessageHandler = loopback_handler + ) -> None: + """Register a basic handler.""" + + self.handlers[self._validate_key(key)] = handler + + def typed_handler( + self, key: str, kind: Type[T], handler: TypedHandler[T] + ) -> None: + """Register a typed handler.""" + + self.typed_handlers[self._validate_key(key)] = (kind, handler) + + def send_json( + self, data: Union[JsonMessage, JsonCodec], addr: Tuple[str, int] = None + ) -> None: + """Send a JSON message.""" + + if isinstance(data, JsonCodec): + data = data.asdict() + + self.send_message_str(dumps(data, separators=(",", ":")), addr=addr) + + async def wait_json( + self, + data: Union[JsonMessage, JsonCodec], + addr: Tuple[str, int] = None, + timeout: float = DEFAULT_TIMEOUT, + ) -> JsonMessage: + """Send a JSON message and wait for a response.""" + + if isinstance(data, JsonCodec): + data = data.asdict() + + data = copy(data) + assert "__id__" not in data, data + data["__id__"] = self.curr_id + + got_response = asyncio.Event() + + ident = self.curr_id + self.curr_id += 1 + + assert ident not in self.ids_waiting + self.ids_waiting[ident] = got_response + + # Send message and await response. + self.send_json(data, addr=addr) + + assert await event_wait( + got_response, timeout + ), f"No response received in {timeout} seconds!" + + # Return the result. + result = self.id_responses[ident] + del self.id_responses[ident] + + return result + + async def loopback( + self, + data: JsonMessage = None, + addr: Tuple[str, int] = None, + timeout: float = DEFAULT_TIMEOUT, + ) -> bool: + """Perform a simple loopback test on this connection.""" + + if data is None: + data = DEFAULT_LOOPBACK + + message = {"loopback": data} + response = await self.wait_json(message, addr=addr, timeout=timeout) + status = response == message + + self.logger.info( + "Loopback result: '%s' (%s).", + response, + "success" if status else "fail", + ) + + return status + + async def async_init(self) -> bool: + """A runtime initialization routine (executes during 'process').""" + + # Check loopback if it makes sense to. + result = await super().async_init() + + # Only not-connected UDP connections can't do this. + if ( + result + and hasattr("self", "remote_address") + or not isinstance(self, UdpConnection) + ): + result = await self.loopback() + + return result + + async def process_json( + self, data: JsonMessage, addr: Tuple[str, int] = None + ) -> bool: + """Process a JSON message.""" + + response: JsonMessage = {} + + keys_ignored = [] + + tasks = [] + + sub_responses: JsonMessage = {} + + for key, item in data.items(): + if self._valid_new_key(key): + keys_ignored.append(key) + continue + + sub_response: JsonMessage = {} + + # Prepare handler. Each sets its own response data. + if key in self.handlers: + tasks.append(self.handlers[key](sub_response, item)) + elif key in self.typed_handlers: + kind, handler = self.typed_handlers[key] + tasks.append(handler(sub_response, kind.create(item))) + + sub_responses[key] = sub_response + + # Run handlers in parallel. + if tasks: + await asyncio.gather(*tasks) + + # Promote sub-responses to message output. + for key, sub_response in sub_responses.items(): + if sub_response: + response[key] = sub_response + del sub_responses + + if keys_ignored: + response["keys_ignored"] = sorted(keys_ignored) + + # If a message identifier is present, send one in the response. + if "__id__" in data: + ident = data["__id__"] + if ident in self.ids_waiting: + del data["__id__"] + self.id_responses[ident] = data + event = self.ids_waiting[ident] + del self.ids_waiting[ident] + event.set() + response["__id__"] = ident + + if response: + self.send_json(response, addr=addr) + + return True + + async def process_message( + self, data: str, addr: Tuple[str, int] = None + ) -> bool: + """Process a string message.""" + + result = True + + try: + decoded = loads(data) + + if decoded and isinstance(decoded, dict): + result = await self.process_json(decoded, addr=addr) + else: + self.logger.error("Ignoring message '%s'.", data) + except JSONDecodeError as exc: + self.logger.exception("Couldn't decode '%s': %s", data, exc) + + return result diff --git a/runtimepy/net/stream/string.py b/runtimepy/net/stream/string.py new file mode 100644 index 00000000..c8820f22 --- /dev/null +++ b/runtimepy/net/stream/string.py @@ -0,0 +1,30 @@ +""" +A module implementing a string-message stream interface. +""" + +# built-in +from typing import BinaryIO as _BinaryIO +from typing import Tuple + +# internal +from runtimepy.net.stream.base import PrefixedMessageConnection + + +class StringMessageConnection(PrefixedMessageConnection): + """A simple string-message sending and processing connection.""" + + async def process_message( + self, data: str, addr: Tuple[str, int] = None + ) -> bool: + """Process a string message.""" + + del addr + self.logger.info(data) + return True + + async def process_single( + self, stream: _BinaryIO, addr: Tuple[str, int] = None + ) -> bool: + """Process a single message.""" + + return await self.process_message(stream.read().decode(), addr=addr) diff --git a/tests/data/valid/connection_arbiter/basic.yaml b/tests/data/valid/connection_arbiter/basic.yaml index 0cbf1bc3..62ad0d4c 100644 --- a/tests/data/valid/connection_arbiter/basic.yaml +++ b/tests/data/valid/connection_arbiter/basic.yaml @@ -3,17 +3,14 @@ includes: - ports.yaml - basic_factories.yaml - tasks.yaml + - json.yaml app: - runtimepy.net.apps.init_only - - runtimepy.net.apps.init_only - - [ - runtimepy.net.apps.init_only, tests.net.stream.stream_test, - runtimepy.net.apps.init_only, + tests.net.stream.json_test, ] - - runtimepy.net.apps.init_only clients: @@ -27,9 +24,7 @@ clients: - factory: tcp_string name: tcp_message_client defer: true - kwargs: - host: localhost - port: "$tcp_string" + kwargs: {host: localhost, port: "$tcp_string"} - factory: udp_string name: udp_message_client diff --git a/tests/data/valid/connection_arbiter/basic_factories.yaml b/tests/data/valid/connection_arbiter/basic_factories.yaml index cf21a79c..82201031 100644 --- a/tests/data/valid/connection_arbiter/basic_factories.yaml +++ b/tests/data/valid/connection_arbiter/basic_factories.yaml @@ -1,6 +1,7 @@ --- includes: - echo_factories.yaml + - package://runtimepy/factories.yaml factories: # Connection factories. @@ -11,6 +12,5 @@ factories: - {name: tests.sample.TcpString, namespaces: [tcp, message]} # Task factories. - - {name: runtimepy.net.arbiter.housekeeping.ConnectionMetricsLoggerFactory} - {name: tests.sample.SampleTaskFactoryA, namespaces: [tasks, a]} - {name: tests.sample.SampleTaskFactoryB, namespaces: [tasks, b]} diff --git a/tests/data/valid/connection_arbiter/json.yaml b/tests/data/valid/connection_arbiter/json.yaml new file mode 100644 index 00000000..4cedb394 --- /dev/null +++ b/tests/data/valid/connection_arbiter/json.yaml @@ -0,0 +1,31 @@ +--- +clients: + - factory: udp_json + name: udp_json_client + defer: true + kwargs: + remote_addr: [localhost, "$udp_json"] + - factory: udp_json + name: udp_json_server + kwargs: + local_addr: [localhost, "$udp_json"] + + - factory: tcp_json + name: tcp_json + defer: true + kwargs: {host: localhost, port: "$tcp_json"} + + - factory: websocket_json + name: websocket_json + defer: true + args: ["ws://localhost:$websocket_json"] + +servers: + - factory: tcp_json + kwargs: + port: "$tcp_json" + + - factory: websocket_json + kwargs: + host: "0.0.0.0" + port: "$websocket_json" diff --git a/tests/net/stream/__init__.py b/tests/net/stream/__init__.py index 0a6a17e5..26efb61b 100644 --- a/tests/net/stream/__init__.py +++ b/tests/net/stream/__init__.py @@ -5,9 +5,13 @@ # built-in import asyncio +# third-party +from vcorelib.dict.codec import BasicDictCodec + # module under test from runtimepy.net.arbiter.info import AppInfo from runtimepy.net.stream import StringMessageConnection +from runtimepy.net.stream.json import JsonMessage, JsonMessageConnection async def stream_test(app: AppInfo) -> int: @@ -25,3 +29,74 @@ async def stream_test(app: AppInfo) -> int: assert count > 0 return 0 + + +async def json_client_test(client: JsonMessageConnection) -> int: + """Test a single JSON client.""" + + client.send_json({}) + await client.wait_json({}) + + assert await client.wait_json({"unknown": 0, "command": 1}) == { + "keys_ignored": ["command", "unknown"] + } + + codec = BasicDictCodec.create({"a": 1, "b": 2, "c": 3}) + client.send_json(codec) + assert await client.wait_json(codec) == {"keys_ignored": ["a", "b", "c"]} + + # Should trigger decode error. + client.send_message_str("{hello") + + # Test loopback. + assert await client.loopback() + assert await client.loopback(data={"a": 1, "b": 2, "c": 3}) + + return 0 + + +async def json_test(app: AppInfo) -> int: + """Test JSON clients in parallel.""" + + # Add typed handler for UDP server connection. + udp_server = app.single( + pattern="udp_json_server", kind=JsonMessageConnection + ) + + async def typed_handler( + response: JsonMessage, data: BasicDictCodec + ) -> None: + """An example handler.""" + + response["it_worked"] = True + response.update(data.asdict()) + + # Test handler. + udp_server.typed_handler("test", BasicDictCodec, typed_handler) + + udp_client = app.single( + pattern="udp_json_client", kind=JsonMessageConnection + ) + + result = await udp_client.wait_json({"test": {"a": 1, "b": 2, "c": 3}}) + result = result["test"] + assert "it_worked" in result + assert result["it_worked"] is True, result + assert result["a"] == 1, result + assert result["b"] == 2, result + assert result["c"] == 3, result + + return sum( + await asyncio.gather( + *[ + json_client_test(client) + for client in [ + udp_client, + app.single(pattern="tcp_json", kind=JsonMessageConnection), + app.single( + pattern="websocket_json", kind=JsonMessageConnection + ), + ] + ] + ) + ) diff --git a/tests/net/stream/test_stream.py b/tests/net/stream/test_stream.py new file mode 100644 index 00000000..8667f098 --- /dev/null +++ b/tests/net/stream/test_stream.py @@ -0,0 +1,20 @@ +""" +Test the 'net.stream' module. +""" + +# built-in +import asyncio + +# third-party +from pytest import mark + +# module under test +from runtimepy.net.stream.json import event_wait + + +@mark.asyncio +async def test_event_wait_basic(): + """Test the event wait can time out.""" + + event = asyncio.Event() + assert not await event_wait(event, 0.0)