From cc46d905453ef0fdd356af6a54b945f4ac3cc4ab Mon Sep 17 00:00:00 2001 From: Vaughn Kottler Date: Thu, 31 Aug 2023 22:00:24 -0700 Subject: [PATCH] 2.2.0 - Parallel apps and string message conn --- .github/workflows/python-package.yml | 2 +- Makefile | 4 +- README.md | 4 +- local/arbiter/tasks/__init__.py | 14 ++++++ local/arbiter/test.yaml | 2 +- local/configs/package.yaml | 2 +- local/variables/package.yaml | 4 +- pyproject.toml | 2 +- runtimepy/__init__.py | 4 +- .../data/schemas/ConnectionArbiterConfig.yaml | 6 ++- runtimepy/net/arbiter/base.py | 46 +++++++++++++------ runtimepy/net/arbiter/imports.py | 13 ++++-- runtimepy/net/arbiter/info.py | 13 ++++++ runtimepy/net/stream/__init__.py | 36 +++++++++++++-- runtimepy/requirements.txt | 2 +- .../data/valid/connection_arbiter/basic.yaml | 30 ++++++++++++ .../connection_arbiter/basic_factories.yaml | 2 + .../data/valid/connection_arbiter/ports.yaml | 3 ++ tests/net/stream/__init__.py | 27 +++++++++++ tests/sample.py | 16 +++++++ 20 files changed, 196 insertions(+), 36 deletions(-) create mode 100644 tests/net/stream/__init__.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index acd8393e..b3009bee 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -70,7 +70,7 @@ jobs: - run: | mk python-release owner=vkottler \ - repo=runtimepy version=2.1.3 + repo=runtimepy version=2.2.0 if: | matrix.python-version == '3.11' && matrix.system == 'ubuntu-latest' diff --git a/Makefile b/Makefile index ed6287e0..3b94f917 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ $(error target this Makefile with 'mk', not '$(MAKE)' ($(MK_INFO))) endif ############################################################################### -.PHONY: all edit clean yaml +.PHONY: all edit clean .DEFAULT_GOAL := all @@ -17,5 +17,3 @@ all: $(DZ_PREFIX)sync yaml edit: $(PY_PREFIX)edit clean: $(PY_PREFIX)clean $(DZ_PREFIX)clean - -yaml: $(YAML_PREFIX)lint-local $(YAML_PREFIX)lint-manifest.yaml diff --git a/README.md b/README.md index ac83de95..9b8d77fb 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ ===================================== generator=datazen version=3.1.2 - hash=6a75e29b42b1d2fa20b1373c2a8adc01 + hash=aa6f45c3cbecc82321c77d61496f2fdd ===================================== --> -# runtimepy ([2.1.3](https://pypi.org/project/runtimepy/)) +# runtimepy ([2.2.0](https://pypi.org/project/runtimepy/)) [![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/) ![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg) diff --git a/local/arbiter/tasks/__init__.py b/local/arbiter/tasks/__init__.py index bf2e3a5f..b41f3356 100644 --- a/local/arbiter/tasks/__init__.py +++ b/local/arbiter/tasks/__init__.py @@ -10,6 +10,20 @@ from runtimepy.net.arbiter import AppInfo +async def noop1(app: AppInfo) -> int: + """An app that doesn't do much.""" + + app.logger.info("I ran!") + return 0 + + +async def noop2(app: AppInfo) -> int: + """An app that doesn't do much.""" + + app.logger.info("I ran!") + return 1 + + async def test(app: AppInfo) -> int: """A network application that doesn't do anything.""" diff --git a/local/arbiter/test.yaml b/local/arbiter/test.yaml index 14dfb4df..61b55717 100644 --- a/local/arbiter/test.yaml +++ b/local/arbiter/test.yaml @@ -11,4 +11,4 @@ tasks: period_s: 1.0 app: - - tasks.test + - [tasks.test, tasks.noop1, tasks.noop2] diff --git a/local/configs/package.yaml b/local/configs/package.yaml index 765d31a5..b2f4d4eb 100644 --- a/local/configs/package.yaml +++ b/local/configs/package.yaml @@ -5,7 +5,7 @@ description: A framework for implementing Python services. entry: {{entry}} requirements: - - vcorelib>=2.5.4 + - vcorelib>=2.6.0 - websockets - "windows-curses; sys_platform == 'win32'" diff --git a/local/variables/package.yaml b/local/variables/package.yaml index 254e6e35..7c03dde2 100644 --- a/local/variables/package.yaml +++ b/local/variables/package.yaml @@ -1,5 +1,5 @@ --- major: 2 -minor: 1 -patch: 3 +minor: 2 +patch: 0 entry: runtimepy diff --git a/pyproject.toml b/pyproject.toml index 0714556e..9dc78ef3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__" [project] name = "runtimepy" -version = "2.1.3" +version = "2.2.0" description = "A framework for implementing Python services." readme = "README.md" requires-python = ">=3.8" diff --git a/runtimepy/__init__.py b/runtimepy/__init__.py index 118eb9dd..309e1f10 100644 --- a/runtimepy/__init__.py +++ b/runtimepy/__init__.py @@ -1,7 +1,7 @@ # ===================================== # generator=datazen # version=3.1.2 -# hash=933cfe7057a94bf7e43e8b9938cb8159 +# hash=2b3b621088205075e7479125ae6d6f24 # ===================================== """ @@ -10,4 +10,4 @@ DESCRIPTION = "A framework for implementing Python services." PKG_NAME = "runtimepy" -VERSION = "2.1.3" +VERSION = "2.2.0" diff --git a/runtimepy/data/schemas/ConnectionArbiterConfig.yaml b/runtimepy/data/schemas/ConnectionArbiterConfig.yaml index 5d3cf4a2..53133084 100644 --- a/runtimepy/data/schemas/ConnectionArbiterConfig.yaml +++ b/runtimepy/data/schemas/ConnectionArbiterConfig.yaml @@ -25,7 +25,11 @@ properties: - type: string - type: array items: - type: string + oneOf: + - type: string + - type: array + items: + type: string # Application configuration data. config: diff --git a/runtimepy/net/arbiter/base.py b/runtimepy/net/arbiter/base.py index 6fd7a34a..c9c83861 100644 --- a/runtimepy/net/arbiter/base.py +++ b/runtimepy/net/arbiter/base.py @@ -6,7 +6,6 @@ import asyncio as _asyncio from contextlib import AsyncExitStack as _AsyncExitStack from inspect import isawaitable as _isawaitable -from logging import getLogger as _getLogger from typing import Awaitable as _Awaitable from typing import Callable as _Callable from typing import Iterable as _Iterable @@ -45,16 +44,18 @@ async def init_only(app: AppInfo) -> int: def normalize_app( app: NetworkApplicationlike = None, -) -> _List[NetworkApplication]: +) -> _List[_List[NetworkApplication]]: """ Normalize some application parameter into a list of network applications. """ if app is None: app = [init_only] - elif not isinstance(app, list): + + if not isinstance(app, list): app = [app] - return app + + return [app] class BaseConnectionArbiter(_NamespaceMixin, _LoggerMixin): @@ -92,7 +93,7 @@ def __init__( # A fallback application. Set a class attribute so this can be more # easily externally updated. - self._apps: _List[NetworkApplication] = normalize_app(app) + self._apps: _List[_List[NetworkApplication]] = normalize_app(app) # Application configuration data. if config is None: @@ -211,19 +212,11 @@ async def _entry( if app is not None: apps = normalize_app(app) + # Run applications in order. result = 0 for curr_app in apps: if result == 0: - info.logger = _getLogger(curr_app.__name__) - info.logger.info("Starting.") - try: - result = await curr_app(info) - info.logger.info("Returned %d.", result) - except AssertionError as exc: - info.logger.exception( - "Failed an assertion:", exc_info=exc - ) - result = -1 + result = await self._run_apps(curr_app, info) finally: for conn in self._connections.values(): @@ -232,6 +225,29 @@ async def _entry( return result + async def _run_apps( + self, apps: _List[NetworkApplication], info: AppInfo + ) -> int: + """Run application methods in parallel.""" + + pairs = [(app, info.with_new_logger(app.__name__)) for app in apps] + + for _, inf in pairs: + inf.logger.info("Starting.") + + total = 0 + try: + results = await _asyncio.gather(*(app(inf) for app, inf in pairs)) + for idx, result in enumerate(results): + pairs[idx][1].logger.info("Returned %d.", result) + total += result + + except AssertionError as exc: + info.logger.exception("Failed an assertion:", exc_info=exc) + total = -1 + + return total + async def app( self, app: NetworkApplicationlike = None, diff --git a/runtimepy/net/arbiter/imports.py b/runtimepy/net/arbiter/imports.py index 68a9e263..9d18f31a 100644 --- a/runtimepy/net/arbiter/imports.py +++ b/runtimepy/net/arbiter/imports.py @@ -53,9 +53,16 @@ def set_app(self, module_path: _Union[str, _List[str]]) -> None: # Load all application methods. apps = [] - for path in module_path: - module, app = import_str_and_item(path) - apps.append(getattr(_import_module(module), app)) + for paths in module_path: + if not isinstance(paths, list): + paths = [paths] # type: ignore + + methods = [] + for path in paths: + module, app = import_str_and_item(path) + methods.append(getattr(_import_module(module), app)) + + apps.append(methods) self._apps = apps diff --git a/runtimepy/net/arbiter/info.py b/runtimepy/net/arbiter/info.py index c0e44418..2d7b0511 100644 --- a/runtimepy/net/arbiter/info.py +++ b/runtimepy/net/arbiter/info.py @@ -6,6 +6,7 @@ import asyncio as _asyncio from contextlib import AsyncExitStack as _AsyncExitStack from dataclasses import dataclass +from logging import getLogger as _getLogger from typing import Iterator as _Iterator from typing import MutableMapping as _MutableMapping from typing import Type as _Type @@ -47,6 +48,18 @@ class AppInfo: # Configuration data that may be specified in a configuration file. config: _JsonObject + def with_new_logger(self, name: str) -> "AppInfo": + """Get a copy of this AppInfo instance, but with a new logger.""" + + return AppInfo( + _getLogger(name), + self.stack, + self.connections, + self.names, + self.stop, + self.config, + ) + def search( self, *names: str, diff --git a/runtimepy/net/stream/__init__.py b/runtimepy/net/stream/__init__.py index d0e56ca1..58a19469 100644 --- a/runtimepy/net/stream/__init__.py +++ b/runtimepy/net/stream/__init__.py @@ -1,5 +1,5 @@ """ -A module implementing a stream-oriented connection interface. +A module implementing a base, stream-oriented connection interface. """ # built-in @@ -69,7 +69,7 @@ def send_message_str( async def process_single( self, stream: _BinaryIO, addr: Tuple[str, int] = None ) -> bool: - """Process a single GTP message.""" + """Process a single message.""" del stream del addr return True @@ -111,7 +111,7 @@ async def process_binary( return result -class TcpPrefixedMessageConnection(TcpConnection, PrefixedMessageConnection): +class TcpPrefixedMessageConnection(PrefixedMessageConnection, TcpConnection): """A TCP implementation for size-prefixed messages.""" @@ -155,3 +155,33 @@ class EchoUdpMessageConnection( UdpPrefixedMessageConnection, EchoMessageConnection ): """A connection that just echoes what it was sent.""" + + +class StringMessageConnection(PrefixedMessageConnection): + """A simple string-message sending and processing connection.""" + + async def process_message( + self, data: str, addr: Tuple[str, int] = None + ) -> bool: + """Process a string message.""" + + del addr + self.logger.info(data) + return True + + async def process_single( + self, stream: _BinaryIO, addr: Tuple[str, int] = None + ) -> bool: + """Process a single message.""" + + return await self.process_message(stream.read().decode(), addr=addr) + + +class TcpStringMessageConnection(StringMessageConnection, TcpConnection): + """A simple string-message sending and processing connection using TCP.""" + + +class UdpStringMessageConnection( + StringMessageConnection, UdpPrefixedMessageConnection +): + """A simple string-message sending and processing connection using UDP.""" diff --git a/runtimepy/requirements.txt b/runtimepy/requirements.txt index 04c0c9c9..ec226846 100644 --- a/runtimepy/requirements.txt +++ b/runtimepy/requirements.txt @@ -1,3 +1,3 @@ -vcorelib>=2.5.4 +vcorelib>=2.6.0 websockets windows-curses; sys_platform == 'win32' diff --git a/tests/data/valid/connection_arbiter/basic.yaml b/tests/data/valid/connection_arbiter/basic.yaml index 3de10e15..0cbf1bc3 100644 --- a/tests/data/valid/connection_arbiter/basic.yaml +++ b/tests/data/valid/connection_arbiter/basic.yaml @@ -8,6 +8,14 @@ app: - runtimepy.net.apps.init_only - runtimepy.net.apps.init_only + - [ + runtimepy.net.apps.init_only, + tests.net.stream.stream_test, + runtimepy.net.apps.init_only, + ] + + - runtimepy.net.apps.init_only + clients: - factory: sample_tcp_conn name: client @@ -16,6 +24,19 @@ clients: host: localhost port: "$tcp_server" + - factory: tcp_string + name: tcp_message_client + defer: true + kwargs: + host: localhost + port: "$tcp_string" + + - factory: udp_string + name: udp_message_client + defer: true + kwargs: + remote_addr: [localhost, "$udp_string"] + - factory: sample_websocket_conn name: client defer: true @@ -32,11 +53,20 @@ clients: kwargs: local_addr: [localhost, "$udp_listen"] + - factory: udp_string + name: string_server + kwargs: + local_addr: [localhost, "$udp_string"] + servers: - factory: sample_tcp_conn kwargs: port: "$tcp_server" + - factory: tcp_string + kwargs: + port: "$tcp_string" + - factory: sample_websocket_conn kwargs: host: "0.0.0.0" diff --git a/tests/data/valid/connection_arbiter/basic_factories.yaml b/tests/data/valid/connection_arbiter/basic_factories.yaml index 4fc36342..cf21a79c 100644 --- a/tests/data/valid/connection_arbiter/basic_factories.yaml +++ b/tests/data/valid/connection_arbiter/basic_factories.yaml @@ -7,6 +7,8 @@ factories: - {name: tests.sample.SampleUdpConn, namespaces: [udp, sample]} - {name: tests.sample.SampleTcpConn, namespaces: [tcp, sample]} - {name: tests.sample.SampleWebsocketConn, namespaces: [websocket, sample]} + - {name: tests.sample.UdpString, namespaces: [udp, message]} + - {name: tests.sample.TcpString, namespaces: [tcp, message]} # Task factories. - {name: runtimepy.net.arbiter.housekeeping.ConnectionMetricsLoggerFactory} diff --git a/tests/data/valid/connection_arbiter/ports.yaml b/tests/data/valid/connection_arbiter/ports.yaml index 7168e50f..92f87b59 100644 --- a/tests/data/valid/connection_arbiter/ports.yaml +++ b/tests/data/valid/connection_arbiter/ports.yaml @@ -6,3 +6,6 @@ ports: - {name: udp_message_listen, type: udp} - {name: tcp_message_server, type: tcp} + + - {name: udp_string, type: tcp} + - {name: tcp_string, type: tcp} diff --git a/tests/net/stream/__init__.py b/tests/net/stream/__init__.py new file mode 100644 index 00000000..0a6a17e5 --- /dev/null +++ b/tests/net/stream/__init__.py @@ -0,0 +1,27 @@ +""" +Test the 'net.stream' module. +""" + +# built-in +import asyncio + +# module under test +from runtimepy.net.arbiter.info import AppInfo +from runtimepy.net.stream import StringMessageConnection + + +async def stream_test(app: AppInfo) -> int: + """A network application that doesn't do anything.""" + + count = 0 + for client in app.search( + pattern="message_client", kind=StringMessageConnection + ): + for _ in range(100): + client.send_message_str("Hello, world!") + count += 1 + + await asyncio.sleep(0.1) + + assert count > 0 + return 0 diff --git a/tests/sample.py b/tests/sample.py index 6050a3f5..11813b46 100644 --- a/tests/sample.py +++ b/tests/sample.py @@ -7,6 +7,10 @@ from runtimepy.net.arbiter.tcp import TcpConnectionFactory from runtimepy.net.arbiter.udp import UdpConnectionFactory from runtimepy.net.arbiter.websocket import WebsocketConnectionFactory +from runtimepy.net.stream import ( + TcpStringMessageConnection, + UdpStringMessageConnection, +) # internal from tests.resources import ( @@ -29,6 +33,18 @@ class SampleTaskFactoryB(TaskFactory[SampleArbiterTask]): kind = SampleArbiterTask +class TcpString(TcpConnectionFactory[TcpStringMessageConnection]): + """A string-message connection factory for TCP.""" + + kind = TcpStringMessageConnection + + +class UdpString(UdpConnectionFactory[UdpStringMessageConnection]): + """A string-message connection factory for UDP.""" + + kind = UdpStringMessageConnection + + class SampleUdpConn(UdpConnectionFactory[SampleUdpConnection]): """A connection factory for the sample UDP connection."""