Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.2.0 - Parallel apps and string message conn #92

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

- run: |
mk python-release owner=vkottler \
repo=runtimepy version=2.1.3
repo=runtimepy version=2.2.0
if: |
matrix.python-version == '3.11'
&& matrix.system == 'ubuntu-latest'
Expand Down
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ $(error target this Makefile with 'mk', not '$(MAKE)' ($(MK_INFO)))
endif
###############################################################################

.PHONY: all edit clean yaml
.PHONY: all edit clean

.DEFAULT_GOAL := all

Expand All @@ -17,5 +17,3 @@ all: $(DZ_PREFIX)sync yaml
edit: $(PY_PREFIX)edit

clean: $(PY_PREFIX)clean $(DZ_PREFIX)clean

yaml: $(YAML_PREFIX)lint-local $(YAML_PREFIX)lint-manifest.yaml
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
=====================================
generator=datazen
version=3.1.2
hash=6a75e29b42b1d2fa20b1373c2a8adc01
hash=aa6f45c3cbecc82321c77d61496f2fdd
=====================================
-->

# runtimepy ([2.1.3](https://pypi.org/project/runtimepy/))
# runtimepy ([2.2.0](https://pypi.org/project/runtimepy/))

[![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/)
![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg)
Expand Down
14 changes: 14 additions & 0 deletions local/arbiter/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@
from runtimepy.net.arbiter import AppInfo


async def noop1(app: AppInfo) -> int:
"""An app that doesn't do much."""

app.logger.info("I ran!")
return 0


async def noop2(app: AppInfo) -> int:
"""An app that doesn't do much."""

app.logger.info("I ran!")
return 1


async def test(app: AppInfo) -> int:
"""A network application that doesn't do anything."""

Expand Down
2 changes: 1 addition & 1 deletion local/arbiter/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ tasks:
period_s: 1.0

app:
- tasks.test
- [tasks.test, tasks.noop1, tasks.noop2]
2 changes: 1 addition & 1 deletion local/configs/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: A framework for implementing Python services.
entry: {{entry}}

requirements:
- vcorelib>=2.5.4
- vcorelib>=2.6.0
- websockets
- "windows-curses; sys_platform == 'win32'"

Expand Down
4 changes: 2 additions & 2 deletions local/variables/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
major: 2
minor: 1
patch: 3
minor: 2
patch: 0
entry: runtimepy
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__"

[project]
name = "runtimepy"
version = "2.1.3"
version = "2.2.0"
description = "A framework for implementing Python services."
readme = "README.md"
requires-python = ">=3.8"
Expand Down
4 changes: 2 additions & 2 deletions runtimepy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# =====================================
# generator=datazen
# version=3.1.2
# hash=933cfe7057a94bf7e43e8b9938cb8159
# hash=2b3b621088205075e7479125ae6d6f24
# =====================================

"""
Expand All @@ -10,4 +10,4 @@

DESCRIPTION = "A framework for implementing Python services."
PKG_NAME = "runtimepy"
VERSION = "2.1.3"
VERSION = "2.2.0"
6 changes: 5 additions & 1 deletion runtimepy/data/schemas/ConnectionArbiterConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ properties:
- type: string
- type: array
items:
type: string
oneOf:
- type: string
- type: array
items:
type: string

# Application configuration data.
config:
Expand Down
46 changes: 31 additions & 15 deletions runtimepy/net/arbiter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import asyncio as _asyncio
from contextlib import AsyncExitStack as _AsyncExitStack
from inspect import isawaitable as _isawaitable
from logging import getLogger as _getLogger
from typing import Awaitable as _Awaitable
from typing import Callable as _Callable
from typing import Iterable as _Iterable
Expand Down Expand Up @@ -45,16 +44,18 @@ async def init_only(app: AppInfo) -> int:

def normalize_app(
app: NetworkApplicationlike = None,
) -> _List[NetworkApplication]:
) -> _List[_List[NetworkApplication]]:
"""
Normalize some application parameter into a list of network applications.
"""

if app is None:
app = [init_only]
elif not isinstance(app, list):

if not isinstance(app, list):
app = [app]
return app

return [app]


class BaseConnectionArbiter(_NamespaceMixin, _LoggerMixin):
Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(

# A fallback application. Set a class attribute so this can be more
# easily externally updated.
self._apps: _List[NetworkApplication] = normalize_app(app)
self._apps: _List[_List[NetworkApplication]] = normalize_app(app)

# Application configuration data.
if config is None:
Expand Down Expand Up @@ -211,19 +212,11 @@ async def _entry(
if app is not None:
apps = normalize_app(app)

# Run applications in order.
result = 0
for curr_app in apps:
if result == 0:
info.logger = _getLogger(curr_app.__name__)
info.logger.info("Starting.")
try:
result = await curr_app(info)
info.logger.info("Returned %d.", result)
except AssertionError as exc:
info.logger.exception(
"Failed an assertion:", exc_info=exc
)
result = -1
result = await self._run_apps(curr_app, info)

finally:
for conn in self._connections.values():
Expand All @@ -232,6 +225,29 @@ async def _entry(

return result

async def _run_apps(
self, apps: _List[NetworkApplication], info: AppInfo
) -> int:
"""Run application methods in parallel."""

pairs = [(app, info.with_new_logger(app.__name__)) for app in apps]

for _, inf in pairs:
inf.logger.info("Starting.")

total = 0
try:
results = await _asyncio.gather(*(app(inf) for app, inf in pairs))
for idx, result in enumerate(results):
pairs[idx][1].logger.info("Returned %d.", result)
total += result

except AssertionError as exc:
info.logger.exception("Failed an assertion:", exc_info=exc)
total = -1

return total

async def app(
self,
app: NetworkApplicationlike = None,
Expand Down
13 changes: 10 additions & 3 deletions runtimepy/net/arbiter/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ def set_app(self, module_path: _Union[str, _List[str]]) -> None:

# Load all application methods.
apps = []
for path in module_path:
module, app = import_str_and_item(path)
apps.append(getattr(_import_module(module), app))
for paths in module_path:
if not isinstance(paths, list):
paths = [paths] # type: ignore

methods = []
for path in paths:
module, app = import_str_and_item(path)
methods.append(getattr(_import_module(module), app))

apps.append(methods)

self._apps = apps

Expand Down
13 changes: 13 additions & 0 deletions runtimepy/net/arbiter/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import asyncio as _asyncio
from contextlib import AsyncExitStack as _AsyncExitStack
from dataclasses import dataclass
from logging import getLogger as _getLogger
from typing import Iterator as _Iterator
from typing import MutableMapping as _MutableMapping
from typing import Type as _Type
Expand Down Expand Up @@ -47,6 +48,18 @@ class AppInfo:
# Configuration data that may be specified in a configuration file.
config: _JsonObject

def with_new_logger(self, name: str) -> "AppInfo":
"""Get a copy of this AppInfo instance, but with a new logger."""

return AppInfo(
_getLogger(name),
self.stack,
self.connections,
self.names,
self.stop,
self.config,
)

def search(
self,
*names: str,
Expand Down
36 changes: 33 additions & 3 deletions runtimepy/net/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
A module implementing a stream-oriented connection interface.
A module implementing a base, stream-oriented connection interface.
"""

# built-in
Expand Down Expand Up @@ -69,7 +69,7 @@ def send_message_str(
async def process_single(
self, stream: _BinaryIO, addr: Tuple[str, int] = None
) -> bool:
"""Process a single GTP message."""
"""Process a single message."""
del stream
del addr
return True
Expand Down Expand Up @@ -111,7 +111,7 @@ async def process_binary(
return result


class TcpPrefixedMessageConnection(TcpConnection, PrefixedMessageConnection):
class TcpPrefixedMessageConnection(PrefixedMessageConnection, TcpConnection):
"""A TCP implementation for size-prefixed messages."""


Expand Down Expand Up @@ -155,3 +155,33 @@ class EchoUdpMessageConnection(
UdpPrefixedMessageConnection, EchoMessageConnection
):
"""A connection that just echoes what it was sent."""


class StringMessageConnection(PrefixedMessageConnection):
"""A simple string-message sending and processing connection."""

async def process_message(
self, data: str, addr: Tuple[str, int] = None
) -> bool:
"""Process a string message."""

del addr
self.logger.info(data)
return True

async def process_single(
self, stream: _BinaryIO, addr: Tuple[str, int] = None
) -> bool:
"""Process a single message."""

return await self.process_message(stream.read().decode(), addr=addr)


class TcpStringMessageConnection(StringMessageConnection, TcpConnection):
"""A simple string-message sending and processing connection using TCP."""


class UdpStringMessageConnection(
StringMessageConnection, UdpPrefixedMessageConnection
):
"""A simple string-message sending and processing connection using UDP."""
2 changes: 1 addition & 1 deletion runtimepy/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
vcorelib>=2.5.4
vcorelib>=2.6.0
websockets
windows-curses; sys_platform == 'win32'
30 changes: 30 additions & 0 deletions tests/data/valid/connection_arbiter/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ app:
- runtimepy.net.apps.init_only
- runtimepy.net.apps.init_only

- [
runtimepy.net.apps.init_only,
tests.net.stream.stream_test,
runtimepy.net.apps.init_only,
]

- runtimepy.net.apps.init_only

clients:
- factory: sample_tcp_conn
name: client
Expand All @@ -16,6 +24,19 @@ clients:
host: localhost
port: "$tcp_server"

- factory: tcp_string
name: tcp_message_client
defer: true
kwargs:
host: localhost
port: "$tcp_string"

- factory: udp_string
name: udp_message_client
defer: true
kwargs:
remote_addr: [localhost, "$udp_string"]

- factory: sample_websocket_conn
name: client
defer: true
Expand All @@ -32,11 +53,20 @@ clients:
kwargs:
local_addr: [localhost, "$udp_listen"]

- factory: udp_string
name: string_server
kwargs:
local_addr: [localhost, "$udp_string"]

servers:
- factory: sample_tcp_conn
kwargs:
port: "$tcp_server"

- factory: tcp_string
kwargs:
port: "$tcp_string"

- factory: sample_websocket_conn
kwargs:
host: "0.0.0.0"
Expand Down
2 changes: 2 additions & 0 deletions tests/data/valid/connection_arbiter/basic_factories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ factories:
- {name: tests.sample.SampleUdpConn, namespaces: [udp, sample]}
- {name: tests.sample.SampleTcpConn, namespaces: [tcp, sample]}
- {name: tests.sample.SampleWebsocketConn, namespaces: [websocket, sample]}
- {name: tests.sample.UdpString, namespaces: [udp, message]}
- {name: tests.sample.TcpString, namespaces: [tcp, message]}

# Task factories.
- {name: runtimepy.net.arbiter.housekeeping.ConnectionMetricsLoggerFactory}
Expand Down
3 changes: 3 additions & 0 deletions tests/data/valid/connection_arbiter/ports.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ports:

- {name: udp_message_listen, type: udp}
- {name: tcp_message_server, type: tcp}

- {name: udp_string, type: tcp}
- {name: tcp_string, type: tcp}
Loading