Skip to content

Commit

Permalink
3.9.0 - Add channel event streaming interface
Browse files Browse the repository at this point in the history
  • Loading branch information
vkottler committed Feb 25, 2024
1 parent 5d27af3 commit 3f1b1ad
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
- run: |
mk python-release owner=vkottler \
repo=runtimepy version=3.8.0
repo=runtimepy version=3.9.0
if: |
matrix.python-version == '3.11'
&& matrix.system == 'ubuntu-latest'
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
=====================================
generator=datazen
version=3.1.4
hash=d0656a2e337796f56cdb520895ea2f08
hash=c7bfaabcb235afbdae1921e1defa2f1a
=====================================
-->

# runtimepy ([3.8.0](https://pypi.org/project/runtimepy/))
# runtimepy ([3.9.0](https://pypi.org/project/runtimepy/))

[![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/)
![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg)
Expand Down
2 changes: 1 addition & 1 deletion local/variables/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
major: 3
minor: 8
minor: 9
patch: 0
entry: runtimepy
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__"

[project]
name = "runtimepy"
version = "3.8.0"
version = "3.9.0"
description = "A framework for implementing Python services."
readme = "README.md"
requires-python = ">=3.11"
Expand Down
4 changes: 2 additions & 2 deletions runtimepy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# =====================================
# generator=datazen
# version=3.1.4
# hash=a8407da8b49bd8fea115ca3a4322ae95
# hash=5ed0327c93f9ed7f4cbf35710e3aa389
# =====================================

"""
Expand All @@ -10,7 +10,7 @@

DESCRIPTION = "A framework for implementing Python services."
PKG_NAME = "runtimepy"
VERSION = "3.8.0"
VERSION = "3.9.0"

# runtimepy-specific content.
METRICS_NAME = "metrics"
4 changes: 4 additions & 0 deletions runtimepy/channel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from vcorelib.io.types import JsonObject as _JsonObject

# internal
from runtimepy.channel.event import PrimitiveEvent
from runtimepy.mixins.enum import EnumMixin as _EnumMixin
from runtimepy.primitives import T as _T
from runtimepy.primitives import normalize as _normalize
Expand Down Expand Up @@ -59,6 +60,9 @@ def init(self, data: _JsonObject) -> None:
# A key to this channel's enumeration in the enumeration registry.
self._enum = _cast(str, data.get("enum"))

# An event-streaming interface.
self.event = PrimitiveEvent(self.raw, self.id)

def asdict(self) -> _JsonObject:
"""Obtain a dictionary representing this instance."""

Expand Down
68 changes: 68 additions & 0 deletions runtimepy/channel/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
A module implementing a channel-event protocol.
"""

# built-in
from contextlib import contextmanager
from typing import BinaryIO, Iterator

# internal
from runtimepy.channel.event.header import PrimitiveEventHeader
from runtimepy.primitives import AnyPrimitive


class PrimitiveEvent:
"""A class implementing a simple channel-even interface."""

def __init__(self, primitive: AnyPrimitive, identifier: int) -> None:
"""Initialize this instance."""

self.primitive = primitive
self.header = PrimitiveEventHeader.instance()
PrimitiveEventHeader.init_header(self.header, identifier)
self.prev_ns: int = 0
self.streaming = False

@contextmanager
def registered(self, stream: BinaryIO) -> Iterator[None]:
"""Register a stream as a managed context."""

assert not self.streaming, "Already streaming!"

def callback(_, __) -> None:
"""Emit a change event to the stream."""
self._poll(stream, force=True)

# Poll immediately.
self.prev_ns = 0
self._poll(stream)

raw = self.primitive
ident = raw.register_callback(callback)

self.streaming = True
yield
assert raw.remove_callback(ident)
self.streaming = False

def _poll(self, stream: BinaryIO, force: bool = False) -> int:
"""
Poll this event so that if the underlying channel has changed since the
last write, we write another event.
"""

written = 0

# Check timestamp and update header if necessary.
raw = self.primitive
curr_ns = raw.last_updated_ns
if force or curr_ns >= self.prev_ns:
self.prev_ns = curr_ns
self.header["timestamp"] = curr_ns

# Write header then value.
array = self.header.array
written += array.to_stream(stream)
written += raw.to_stream(stream, byte_order=array.byte_order)

return written
30 changes: 30 additions & 0 deletions runtimepy/channel/event/header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
A module implementing interfaces related to channel-protocol headers.
"""

# internal
from runtimepy.codec.protocol import Protocol, ProtocolFactory
from runtimepy.primitives import Uint16

IdType = Uint16
ID_SINGLE = IdType()


class PrimitiveEventHeader(ProtocolFactory):
"""A protocol for implementing channel events."""

@classmethod
def initialize(cls, protocol: Protocol) -> None:
"""Initialize this protocol."""

protocol.add_field("identifier", kind=IdType)
protocol.add_field("timestamp", kind="uint64")

@classmethod
def init_header(cls, protocol: Protocol, identifier: int) -> None:
"""Initialize a channel-event header."""

bounds = ID_SINGLE.kind.int_bounds
assert bounds is not None
assert bounds.validate(identifier), identifier
protocol["identifier"] = identifier
86 changes: 85 additions & 1 deletion runtimepy/channel/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@
"""

# built-in
from contextlib import ExitStack, contextmanager
from typing import Any as _Any
from typing import BinaryIO, Iterator, NamedTuple
from typing import Optional as _Optional
from typing import Type as _Type
from typing import Union
from typing import Union, cast

# third-party
from vcorelib.io import ByteFifo
from vcorelib.io.types import JsonObject as _JsonObject

# internal
from runtimepy.channel import AnyChannel as _AnyChannel
from runtimepy.channel import Channel as _Channel
from runtimepy.channel.event.header import PrimitiveEventHeader
from runtimepy.codec.protocol import Protocol
from runtimepy.mapping import DEFAULT_PATTERN
from runtimepy.mixins.regex import CHANNEL_PATTERN as _CHANNEL_PATTERN
from runtimepy.primitives import ChannelScaling, Primitive
from runtimepy.primitives import Primitivelike as _Primitivelike
from runtimepy.primitives import normalize
from runtimepy.primitives.type.base import PythonPrimitive
from runtimepy.registry import Registry as _Registry
from runtimepy.registry.name import NameRegistry as _NameRegistry
from runtimepy.registry.name import RegistryKey as _RegistryKey
Expand All @@ -29,16 +36,36 @@ class ChannelNameRegistry(_NameRegistry):
name_regex = _CHANNEL_PATTERN


class ParsedEvent(NamedTuple):
"""A raw channel event."""

name: str
timestamp: int
value: PythonPrimitive


class ChannelRegistry(_Registry[_Channel[_Any]]):
"""A runtime enumeration registry."""

name_registry = ChannelNameRegistry

event_header: Protocol
event_fifo: ByteFifo
header_ready: bool

@property
def kind(self) -> _Type[_Channel[_Any]]:
"""Determine what kind of registry this is."""
return _Channel

def init(self, data: _JsonObject) -> None:
"""Perform implementation-specific initialization."""

super().init(data)
self.event_header = PrimitiveEventHeader.instance()
self.header_ready = False
self.event_fifo = ByteFifo()

def channel(
self,
name: str,
Expand Down Expand Up @@ -76,5 +103,62 @@ def channel(
# Replace the underlying primitive, in case it was direclty passed in.
if result is not None:
result.raw = primitive
result.event.primitive = primitive # type: ignore

return result

@contextmanager
def registered(
self, stream: BinaryIO, pattern: str = DEFAULT_PATTERN
) -> Iterator[None]:
"""Register a stream as a managed context."""

with ExitStack() as stack:
for _, channel in self.search(pattern=pattern):
stack.enter_context(channel.event.registered(stream))

yield

def parse_event_stream(self, stream: BinaryIO) -> Iterator[ParsedEvent]:
"""Parse individual events from a stream."""

# Ingest stream.
self.event_fifo.ingest(stream.read())

ident = -1
name = ""

keep_going = True
while keep_going:
keep_going = False

# Read header.
if not self.header_ready:
read_size = self.event_header.size
data = self.event_fifo.pop(read_size)
if data is not None:
self.event_header.array.update(data)

# Update local variables.
ident = cast(int, self.event_header["identifier"])
name = self.names.name(ident) # type: ignore
assert name is not None, ident

# Update state.
self.header_ready = True
keep_going = True
else:
kind = self[name].type
data = self.event_fifo.pop(kind.size)
if data is not None:
yield ParsedEvent(
name,
cast(int, self.event_header["timestamp"]),
kind.decode(
data, byte_order=self.event_header.array.byte_order
),
)

# Update state.
self.header_ready = False
keep_going = True
13 changes: 13 additions & 0 deletions runtimepy/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

# built-in
import re
from typing import Dict as _Dict
from typing import Generic as _Generic
from typing import Iterator as _Iterator
Expand Down Expand Up @@ -38,6 +39,7 @@
_MutableMapping[str, bool],
_MutableMapping[str, int],
]
DEFAULT_PATTERN = ".*"


class TwoWayNameMapping(_RegexMixin, LoggerMixin, _Generic[T]):
Expand Down Expand Up @@ -142,6 +144,17 @@ def int_from_dict(

return cls(mapping=mapping, reverse=reverse)

def search(
self, pattern: str = DEFAULT_PATTERN, exact: bool = False
) -> _Iterator[str]:
"""Get names in this mapping based on a pattern."""

compiled = re.compile(pattern)
for name in self.names:
if compiled.search(name) is not None:
if not exact or name == pattern:
yield name

@classmethod
def bool_from_dict(
cls: _Type[BoolMapping], data: BoolMappingData
Expand Down
3 changes: 1 addition & 2 deletions runtimepy/net/arbiter/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
from runtimepy.net.arbiter.result import OverallResult, results
from runtimepy.net.connection import Connection as _Connection
from runtimepy.net.manager import ConnectionManager
from runtimepy.registry import DEFAULT_PATTERN
from runtimepy.task import PeriodicTask, PeriodicTaskManager
from runtimepy.tui.mixin import TuiMixin

ConnectionMap = _MutableMapping[str, _Connection]
T = _TypeVar("T", bound=_Connection)
V = _TypeVar("V", bound=PeriodicTask)

DEFAULT_PATTERN = ".*"


@dataclass
class AppInfo:
Expand Down
6 changes: 3 additions & 3 deletions runtimepy/primitives/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def value(self, value: T) -> None:

curr: T = self.raw.value # type: ignore

self.raw.value = value
self.last_updated_ns = default_time_ns()

# Call callbacks if the value has changed.
if self.callbacks and curr != value:
to_remove = []
Expand All @@ -119,9 +122,6 @@ def value(self, value: T) -> None:
for item in to_remove:
self.remove_callback(item)

self.last_updated_ns = default_time_ns()
self.raw.value = value

@property
def scaled(self) -> Numeric:
"""Get this primitive as a scaled value."""
Expand Down
4 changes: 4 additions & 0 deletions runtimepy/primitives/type/bounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class IntegerBounds(NamedTuple):
min: int
max: int

def validate(self, val: int) -> bool:
"""Determine if the value is within bounds."""
return self.min <= val <= self.max

def clamp(self, val: int) -> int:
"""
Ensure that 'val' is between min and max, use the min or max value
Expand Down
Loading

0 comments on commit 3f1b1ad

Please sign in to comment.