diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 3cdce3f5..4c307933 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -68,7 +68,7 @@ jobs: - run: | mk python-release owner=vkottler \ - repo=runtimepy version=3.8.0 + repo=runtimepy version=3.9.0 if: | matrix.python-version == '3.11' && matrix.system == 'ubuntu-latest' diff --git a/README.md b/README.md index 8a76e5af..8379105d 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ ===================================== generator=datazen version=3.1.4 - hash=d0656a2e337796f56cdb520895ea2f08 + hash=c7bfaabcb235afbdae1921e1defa2f1a ===================================== --> -# runtimepy ([3.8.0](https://pypi.org/project/runtimepy/)) +# runtimepy ([3.9.0](https://pypi.org/project/runtimepy/)) [![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/) ![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg) diff --git a/local/variables/package.yaml b/local/variables/package.yaml index 00d4a8e5..4c0f3e61 100644 --- a/local/variables/package.yaml +++ b/local/variables/package.yaml @@ -1,5 +1,5 @@ --- major: 3 -minor: 8 +minor: 9 patch: 0 entry: runtimepy diff --git a/pyproject.toml b/pyproject.toml index 254f887b..40cbd78d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__" [project] name = "runtimepy" -version = "3.8.0" +version = "3.9.0" description = "A framework for implementing Python services." readme = "README.md" requires-python = ">=3.11" diff --git a/runtimepy/__init__.py b/runtimepy/__init__.py index 42ddc42c..b0c66a20 100644 --- a/runtimepy/__init__.py +++ b/runtimepy/__init__.py @@ -1,7 +1,7 @@ # ===================================== # generator=datazen # version=3.1.4 -# hash=a8407da8b49bd8fea115ca3a4322ae95 +# hash=5ed0327c93f9ed7f4cbf35710e3aa389 # ===================================== """ @@ -10,7 +10,7 @@ DESCRIPTION = "A framework for implementing Python services." PKG_NAME = "runtimepy" -VERSION = "3.8.0" +VERSION = "3.9.0" # runtimepy-specific content. METRICS_NAME = "metrics" diff --git a/runtimepy/channel/__init__.py b/runtimepy/channel/__init__.py index adf659ef..5dd4ee44 100644 --- a/runtimepy/channel/__init__.py +++ b/runtimepy/channel/__init__.py @@ -11,6 +11,7 @@ from vcorelib.io.types import JsonObject as _JsonObject # internal +from runtimepy.channel.event import PrimitiveEvent from runtimepy.mixins.enum import EnumMixin as _EnumMixin from runtimepy.primitives import T as _T from runtimepy.primitives import normalize as _normalize @@ -59,6 +60,9 @@ def init(self, data: _JsonObject) -> None: # A key to this channel's enumeration in the enumeration registry. self._enum = _cast(str, data.get("enum")) + # An event-streaming interface. + self.event = PrimitiveEvent(self.raw, self.id) + def asdict(self) -> _JsonObject: """Obtain a dictionary representing this instance.""" diff --git a/runtimepy/channel/event/__init__.py b/runtimepy/channel/event/__init__.py new file mode 100644 index 00000000..a016f384 --- /dev/null +++ b/runtimepy/channel/event/__init__.py @@ -0,0 +1,68 @@ +""" +A module implementing a channel-event protocol. +""" + +# built-in +from contextlib import contextmanager +from typing import BinaryIO, Iterator + +# internal +from runtimepy.channel.event.header import PrimitiveEventHeader +from runtimepy.primitives import AnyPrimitive + + +class PrimitiveEvent: + """A class implementing a simple channel-even interface.""" + + def __init__(self, primitive: AnyPrimitive, identifier: int) -> None: + """Initialize this instance.""" + + self.primitive = primitive + self.header = PrimitiveEventHeader.instance() + PrimitiveEventHeader.init_header(self.header, identifier) + self.prev_ns: int = 0 + self.streaming = False + + @contextmanager + def registered(self, stream: BinaryIO) -> Iterator[None]: + """Register a stream as a managed context.""" + + assert not self.streaming, "Already streaming!" + + def callback(_, __) -> None: + """Emit a change event to the stream.""" + self._poll(stream, force=True) + + # Poll immediately. + self.prev_ns = 0 + self._poll(stream) + + raw = self.primitive + ident = raw.register_callback(callback) + + self.streaming = True + yield + assert raw.remove_callback(ident) + self.streaming = False + + def _poll(self, stream: BinaryIO, force: bool = False) -> int: + """ + Poll this event so that if the underlying channel has changed since the + last write, we write another event. + """ + + written = 0 + + # Check timestamp and update header if necessary. + raw = self.primitive + curr_ns = raw.last_updated_ns + if force or curr_ns >= self.prev_ns: + self.prev_ns = curr_ns + self.header["timestamp"] = curr_ns + + # Write header then value. + array = self.header.array + written += array.to_stream(stream) + written += raw.to_stream(stream, byte_order=array.byte_order) + + return written diff --git a/runtimepy/channel/event/header.py b/runtimepy/channel/event/header.py new file mode 100644 index 00000000..67080706 --- /dev/null +++ b/runtimepy/channel/event/header.py @@ -0,0 +1,30 @@ +""" +A module implementing interfaces related to channel-protocol headers. +""" + +# internal +from runtimepy.codec.protocol import Protocol, ProtocolFactory +from runtimepy.primitives import Uint16 + +IdType = Uint16 +ID_SINGLE = IdType() + + +class PrimitiveEventHeader(ProtocolFactory): + """A protocol for implementing channel events.""" + + @classmethod + def initialize(cls, protocol: Protocol) -> None: + """Initialize this protocol.""" + + protocol.add_field("identifier", kind=IdType) + protocol.add_field("timestamp", kind="uint64") + + @classmethod + def init_header(cls, protocol: Protocol, identifier: int) -> None: + """Initialize a channel-event header.""" + + bounds = ID_SINGLE.kind.int_bounds + assert bounds is not None + assert bounds.validate(identifier), identifier + protocol["identifier"] = identifier diff --git a/runtimepy/channel/registry.py b/runtimepy/channel/registry.py index a5ae30f2..2841bd3d 100644 --- a/runtimepy/channel/registry.py +++ b/runtimepy/channel/registry.py @@ -3,21 +3,28 @@ """ # built-in +from contextlib import ExitStack, contextmanager from typing import Any as _Any +from typing import BinaryIO, Iterator, NamedTuple from typing import Optional as _Optional from typing import Type as _Type -from typing import Union +from typing import Union, cast # third-party +from vcorelib.io import ByteFifo from vcorelib.io.types import JsonObject as _JsonObject # internal from runtimepy.channel import AnyChannel as _AnyChannel from runtimepy.channel import Channel as _Channel +from runtimepy.channel.event.header import PrimitiveEventHeader +from runtimepy.codec.protocol import Protocol +from runtimepy.mapping import DEFAULT_PATTERN from runtimepy.mixins.regex import CHANNEL_PATTERN as _CHANNEL_PATTERN from runtimepy.primitives import ChannelScaling, Primitive from runtimepy.primitives import Primitivelike as _Primitivelike from runtimepy.primitives import normalize +from runtimepy.primitives.type.base import PythonPrimitive from runtimepy.registry import Registry as _Registry from runtimepy.registry.name import NameRegistry as _NameRegistry from runtimepy.registry.name import RegistryKey as _RegistryKey @@ -29,16 +36,36 @@ class ChannelNameRegistry(_NameRegistry): name_regex = _CHANNEL_PATTERN +class ParsedEvent(NamedTuple): + """A raw channel event.""" + + name: str + timestamp: int + value: PythonPrimitive + + class ChannelRegistry(_Registry[_Channel[_Any]]): """A runtime enumeration registry.""" name_registry = ChannelNameRegistry + event_header: Protocol + event_fifo: ByteFifo + header_ready: bool + @property def kind(self) -> _Type[_Channel[_Any]]: """Determine what kind of registry this is.""" return _Channel + def init(self, data: _JsonObject) -> None: + """Perform implementation-specific initialization.""" + + super().init(data) + self.event_header = PrimitiveEventHeader.instance() + self.header_ready = False + self.event_fifo = ByteFifo() + def channel( self, name: str, @@ -76,5 +103,62 @@ def channel( # Replace the underlying primitive, in case it was direclty passed in. if result is not None: result.raw = primitive + result.event.primitive = primitive # type: ignore return result + + @contextmanager + def registered( + self, stream: BinaryIO, pattern: str = DEFAULT_PATTERN + ) -> Iterator[None]: + """Register a stream as a managed context.""" + + with ExitStack() as stack: + for _, channel in self.search(pattern=pattern): + stack.enter_context(channel.event.registered(stream)) + + yield + + def parse_event_stream(self, stream: BinaryIO) -> Iterator[ParsedEvent]: + """Parse individual events from a stream.""" + + # Ingest stream. + self.event_fifo.ingest(stream.read()) + + ident = -1 + name = "" + + keep_going = True + while keep_going: + keep_going = False + + # Read header. + if not self.header_ready: + read_size = self.event_header.size + data = self.event_fifo.pop(read_size) + if data is not None: + self.event_header.array.update(data) + + # Update local variables. + ident = cast(int, self.event_header["identifier"]) + name = self.names.name(ident) # type: ignore + assert name is not None, ident + + # Update state. + self.header_ready = True + keep_going = True + else: + kind = self[name].type + data = self.event_fifo.pop(kind.size) + if data is not None: + yield ParsedEvent( + name, + cast(int, self.event_header["timestamp"]), + kind.decode( + data, byte_order=self.event_header.array.byte_order + ), + ) + + # Update state. + self.header_ready = False + keep_going = True diff --git a/runtimepy/mapping.py b/runtimepy/mapping.py index 2808b35e..3bbbf3f4 100644 --- a/runtimepy/mapping.py +++ b/runtimepy/mapping.py @@ -3,6 +3,7 @@ """ # built-in +import re from typing import Dict as _Dict from typing import Generic as _Generic from typing import Iterator as _Iterator @@ -38,6 +39,7 @@ _MutableMapping[str, bool], _MutableMapping[str, int], ] +DEFAULT_PATTERN = ".*" class TwoWayNameMapping(_RegexMixin, LoggerMixin, _Generic[T]): @@ -142,6 +144,17 @@ def int_from_dict( return cls(mapping=mapping, reverse=reverse) + def search( + self, pattern: str = DEFAULT_PATTERN, exact: bool = False + ) -> _Iterator[str]: + """Get names in this mapping based on a pattern.""" + + compiled = re.compile(pattern) + for name in self.names: + if compiled.search(name) is not None: + if not exact or name == pattern: + yield name + @classmethod def bool_from_dict( cls: _Type[BoolMapping], data: BoolMappingData diff --git a/runtimepy/net/arbiter/info.py b/runtimepy/net/arbiter/info.py index 1262d19f..ebe0b737 100644 --- a/runtimepy/net/arbiter/info.py +++ b/runtimepy/net/arbiter/info.py @@ -23,6 +23,7 @@ from runtimepy.net.arbiter.result import OverallResult, results from runtimepy.net.connection import Connection as _Connection from runtimepy.net.manager import ConnectionManager +from runtimepy.registry import DEFAULT_PATTERN from runtimepy.task import PeriodicTask, PeriodicTaskManager from runtimepy.tui.mixin import TuiMixin @@ -30,8 +31,6 @@ T = _TypeVar("T", bound=_Connection) V = _TypeVar("V", bound=PeriodicTask) -DEFAULT_PATTERN = ".*" - @dataclass class AppInfo: diff --git a/runtimepy/primitives/base.py b/runtimepy/primitives/base.py index a12b86b0..6076d197 100644 --- a/runtimepy/primitives/base.py +++ b/runtimepy/primitives/base.py @@ -107,6 +107,9 @@ def value(self, value: T) -> None: curr: T = self.raw.value # type: ignore + self.raw.value = value + self.last_updated_ns = default_time_ns() + # Call callbacks if the value has changed. if self.callbacks and curr != value: to_remove = [] @@ -119,9 +122,6 @@ def value(self, value: T) -> None: for item in to_remove: self.remove_callback(item) - self.last_updated_ns = default_time_ns() - self.raw.value = value - @property def scaled(self) -> Numeric: """Get this primitive as a scaled value.""" diff --git a/runtimepy/primitives/type/bounds.py b/runtimepy/primitives/type/bounds.py index d7e9e241..a5787df3 100644 --- a/runtimepy/primitives/type/bounds.py +++ b/runtimepy/primitives/type/bounds.py @@ -13,6 +13,10 @@ class IntegerBounds(NamedTuple): min: int max: int + def validate(self, val: int) -> bool: + """Determine if the value is within bounds.""" + return self.min <= val <= self.max + def clamp(self, val: int) -> int: """ Ensure that 'val' is between min and max, use the min or max value diff --git a/runtimepy/registry/__init__.py b/runtimepy/registry/__init__.py index 7c2acc93..f76e02d1 100644 --- a/runtimepy/registry/__init__.py +++ b/runtimepy/registry/__init__.py @@ -7,7 +7,9 @@ from abc import abstractmethod as _abstractmethod from typing import Dict as _Dict from typing import Generic as _Generic +from typing import Iterator from typing import Optional as _Optional +from typing import Tuple from typing import Type as _Type from typing import TypeVar as _TypeVar from typing import cast as _cast @@ -17,6 +19,7 @@ from vcorelib.io.types import JsonValue as _JsonValue # internal +from runtimepy.mapping import DEFAULT_PATTERN from runtimepy.registry.item import RegistryItem as _RegistryItem from runtimepy.registry.name import NameRegistry as _NameRegistry from runtimepy.registry.name import RegistryKey as _RegistryKey @@ -49,6 +52,13 @@ def init(self, data: _JsonObject) -> None: reverse={name: item.id for name, item in self.items.items()} ) + def search( + self, pattern: str = DEFAULT_PATTERN + ) -> Iterator[Tuple[str, T]]: + """Search for items in the registry by name.""" + for name in self.names.search(pattern=pattern): + yield name, self.items[name] + def asdict(self) -> _JsonObject: """Get this registry as a dictionary.""" diff --git a/tests/channel/test_streams.py b/tests/channel/test_streams.py new file mode 100644 index 00000000..b14f87f9 --- /dev/null +++ b/tests/channel/test_streams.py @@ -0,0 +1,49 @@ +""" +Test data-streaming capabilities of channel registries. +""" + +# third-party +from vcorelib.paths.context import tempfile + +# module under test +from runtimepy.channel.environment import ChannelEnvironment + + +def test_channel_registry_streams_basic(): + """Test basic interactions with a streaming chanel events.""" + + env = ChannelEnvironment() + assert env.int_channel("a") + assert env.int_channel("b") + assert env.int_channel("c") + + assert len(list(env.channels.search())) == 3 + + with tempfile() as path: + with path.open("wb") as path_fd: + with env.channels.registered(path_fd): + assert env.channels["a"].raw.callbacks + + env.set("a", 1) + env.set("b", 2) + env.set("c", 3) + + # Open for reading and verify events. + with path.open("rb") as path_fd: + events = list(env.channels.parse_event_stream(path_fd)) + + assert len(events) == 6 + + assert events[0].name == "a" + assert events[0].value == 0 + assert events[1].name == "b" + assert events[1].value == 0 + assert events[2].name == "c" + assert events[2].value == 0 + + assert events[3].name == "a" + assert events[3].value == 1 + assert events[4].name == "b" + assert events[4].value == 2 + assert events[5].name == "c" + assert events[5].value == 3