Skip to content

Commit

Permalink
2.14.0 - Add support for connection restarting
Browse files Browse the repository at this point in the history
  • Loading branch information
vkottler committed Oct 20, 2023
1 parent 1c7e32f commit 97d36b7
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
- run: |
mk python-release owner=vkottler \
repo=runtimepy version=2.13.4
repo=runtimepy version=2.14.0
if: |
matrix.python-version == '3.11'
&& matrix.system == 'ubuntu-latest'
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
=====================================
generator=datazen
version=3.1.3
hash=50efa13ced2b135f66aff7b19a87d83d
hash=995dec4050d0b078008ad8e4911d515c
=====================================
-->

# runtimepy ([2.13.4](https://pypi.org/project/runtimepy/))
# runtimepy ([2.14.0](https://pypi.org/project/runtimepy/))

[![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/)
![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg)
Expand Down
4 changes: 2 additions & 2 deletions local/variables/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
major: 2
minor: 13
patch: 4
minor: 14
patch: 0
entry: runtimepy
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__"

[project]
name = "runtimepy"
version = "2.13.4"
version = "2.14.0"
description = "A framework for implementing Python services."
readme = "README.md"
requires-python = ">=3.11"
Expand Down
4 changes: 2 additions & 2 deletions runtimepy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# =====================================
# generator=datazen
# version=3.1.3
# hash=95514f08f4202b6195f2631ae525def6
# hash=ce4d96baa832f20ead1736b028ac1141
# =====================================

"""
Expand All @@ -10,7 +10,7 @@

DESCRIPTION = "A framework for implementing Python services."
PKG_NAME = "runtimepy"
VERSION = "2.13.4"
VERSION = "2.14.0"

# runtimepy-specific content.
METRICS_NAME = "metrics"
61 changes: 55 additions & 6 deletions runtimepy/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from runtimepy.metrics import ConnectionMetrics
from runtimepy.mixins.environment import ChannelEnvironmentMixin
from runtimepy.mixins.logging import LoggerMixinLevelControl
from runtimepy.primitives import Bool
from runtimepy.primitives import Bool, Uint8
from runtimepy.primitives.byte_order import DEFAULT_BYTE_ORDER, ByteOrder

BinaryMessage = _Union[bytes, bytearray, memoryview]
Expand All @@ -44,7 +44,6 @@ def __init__(
"""Initialize this connection."""

LoggerMixinLevelControl.__init__(self, logger=logger)
self._enabled = Bool(True)

# A queue for out-going text messages. Connections that don't use
# this can set 'uses_text_tx_queue' to False to avoid scheduling a
Expand All @@ -60,7 +59,6 @@ def __init__(

self._tasks: _List[_asyncio.Task[None]] = []
self.initialized = _asyncio.Event()
self.disabled_event = _asyncio.Event()

self.metrics = ConnectionMetrics()

Expand All @@ -71,10 +69,24 @@ def __init__(
self.register_connection_metrics(self.metrics)

# State.
self._enabled = Bool()
self.disabled_event = _asyncio.Event()
self.env.channel("enabled", self._enabled)
self._set_enabled(True)

self._restarts = Uint8()
self.env.channel("restarts", self._restarts)

self._auto_restart = Bool()
self.env.channel("auto_restart", self._auto_restart)

self.init()

@property
def auto_restart(self) -> bool:
"""Determine if this connection should be automatically restarted."""
return bool(self._auto_restart)

def init(self) -> None:
"""Initialize this instance."""

Expand Down Expand Up @@ -132,24 +144,34 @@ def disabled(self) -> bool:
def disable_extra(self) -> None:
"""Additional tasks to perform when disabling."""

def _set_enabled(self, state: bool) -> None:
"""Set the enabled state for this connection."""

self._enabled.value = state
if not state:
self.disabled_event.set()
self.initialized.clear()
else:
self.disabled_event.clear()

def disable(self, reason: str) -> None:
"""Disable this connection."""

if self._enabled:
self.logger.info("Disabling connection: '%s'.", reason)
self.disable_extra()
self._enabled.value = False

# Cancel tasks.
for task in self._tasks:
if not task.done():
task.cancel()

# Signal that this connection has been disabled.
self.disabled_event.set()
self._set_enabled(False)

async def _wait_sig(self, stop_sig: _asyncio.Event) -> None:
"""Disable the connection if a stop signal gets set."""

await stop_sig.wait()
self.disable("stop signal")

Expand All @@ -164,16 +186,43 @@ async def _async_init(self) -> None:
self.env.finalize(strict=False)
self.initialized.set()

async def process(self, stop_sig: _asyncio.Event = None) -> None:
async def restart(self) -> bool:
"""
Reset necessary underlying state for this connection to 'process'
again.
"""
raise NotImplementedError

async def disable_in(self, time: float) -> None:
"""A method for disabling a connection after some delay."""

await _asyncio.sleep(time)
self.disable(f"timed disable ({time}s)")

async def process(
self, stop_sig: _asyncio.Event = None, disable_time: float = None
) -> None:
"""
Process tasks for this connection while the connection is active.
"""

# Try to re-enable the connection if necessary.
if self.disabled and (stop_sig is None or not stop_sig.is_set()):
assert await self.restart()
self._set_enabled(True)
self._restarts.raw.value += 1

self._tasks = [
_asyncio.create_task(self._process_read()),
_asyncio.create_task(self._async_init()),
]

# Disable the connection automatically if requested.
if disable_time is not None:
self._tasks.append(
_asyncio.create_task(self.disable_in(disable_time))
)

if self.uses_text_tx_queue:
self._tasks.append(
_asyncio.create_task(self._process_write_text())
Expand Down
14 changes: 13 additions & 1 deletion runtimepy/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,19 @@ async def manage(self, stop_sig: _asyncio.Event) -> None:
next_tasks = _log_exceptions(tasks)

# Filter out disabled connections.
self._conns = [x for x in self._conns if not x.disabled]
enabled = []
for conn in self._conns:
if not conn.disabled:
enabled.append(conn)

# Check if this connection should be restarted.
elif conn.auto_restart:
next_tasks.append(
_asyncio.create_task(conn.process(stop_sig=stop_sig))
)
enabled.append(conn)

self._conns = enabled

# If a new connection was made, register a task for processing
# it.
Expand Down
9 changes: 7 additions & 2 deletions runtimepy/net/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class TransportMixin:

_transport: _asyncio.BaseTransport

def __init__(self, transport: _asyncio.BaseTransport) -> None:
"""Initialize this instance."""
def set_transport(self, transport: _asyncio.BaseTransport) -> None:
"""Set the transport for this instance."""

self._transport = transport

Expand All @@ -45,6 +45,11 @@ def __init__(self, transport: _asyncio.BaseTransport) -> None:
# None).
self.remote_address = self._remote_address()

def __init__(self, transport: _asyncio.BaseTransport) -> None:
"""Initialize this instance."""

self.set_transport(transport)

@property
def socket(self) -> _SocketType:
"""Get this instance's underlying socket."""
Expand Down
48 changes: 44 additions & 4 deletions runtimepy/net/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,18 @@ def __init__(self, transport: _Transport, protocol: QueueProtocol) -> None:

# Re-assign with updated type information.
self._transport: _Transport = transport
self._set_protocol(protocol)

super().__init__(_getLogger(self.logger_name("TCP ")))

# Store connection-instantiation arguments.
self._conn_kwargs: dict[str, _Any] = {}

def _set_protocol(self, protocol: QueueProtocol) -> None:
"""Set a new protocol for this instance."""

self._protocol = protocol
self._protocol.conn = self
super().__init__(_getLogger(self.logger_name("TCP ")))

async def _await_message(self) -> _Optional[_Union[_BinaryMessage, str]]:
"""Await the next message. Return None on error or failure."""
Expand All @@ -108,16 +116,48 @@ def send_binary(self, data: _BinaryMessage) -> None:
self.metrics.tx.increment(len(data))

@classmethod
async def create_connection(cls: _Type[T], **kwargs) -> T:
"""Create a TCP connection."""
async def _transport_protocol(
cls: _Type[T], **kwargs
) -> tuple[_Transport, QueueProtocol]:
"""
Create a transport and protocol pair relevant for this class's
implementation.
"""

eloop = _get_event_loop()

transport: _Transport
transport, protocol = await eloop.create_connection(
QueueProtocol, **kwargs
)
return cls(transport, protocol)
return transport, protocol

async def restart(self) -> bool:
"""
Reset necessary underlying state for this connection to 'process'
again.
"""

transport, protocol = await self._transport_protocol(
**self._conn_kwargs
)
self.set_transport(transport)
self._set_protocol(protocol)

return True

@classmethod
async def create_connection(cls: _Type[T], **kwargs) -> T:
"""Create a TCP connection."""

transport, protocol = await cls._transport_protocol(**kwargs)
inst = cls(transport, protocol)

# Is there a better way to do this? We can't restart a server's side
# of a connection (seems okay).
inst._conn_kwargs = {**kwargs}

return inst

@classmethod
@_asynccontextmanager
Expand Down
Loading

0 comments on commit 97d36b7

Please sign in to comment.