Skip to content

Commit

Permalink
Merge pull request #93 from vkottler/dev/2.3.0
Browse files Browse the repository at this point in the history
2.3.0 - JSON message connections
  • Loading branch information
vkottler authored Sep 1, 2023
2 parents 0a89665 + 1298b83 commit bcbb047
Show file tree
Hide file tree
Showing 15 changed files with 625 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[DESIGN]
max-args=8
max-attributes=12
max-parents=8
max-parents=9

[MESSAGES CONTROL]
disable=too-few-public-methods
Expand Down
2 changes: 1 addition & 1 deletion local/variables/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
major: 2
minor: 2
minor: 3
patch: 0
entry: runtimepy
15 changes: 15 additions & 0 deletions runtimepy/data/factories.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
factories:
# Connection factories.
- {name: runtimepy.net.factories.TcpJson}
- {name: runtimepy.net.factories.UdpJson}
- {name: runtimepy.net.factories.WebsocketJson}

# Task factories.
- {name: runtimepy.net.arbiter.housekeeping.ConnectionMetricsLoggerFactory}

ports:
# Reserve ports for JSON listeners.
- {name: udp_json, type: udp}
- {name: tcp_json, type: tcp}
- {name: websocket_json, type: tcp}
File renamed without changes.
25 changes: 25 additions & 0 deletions runtimepy/net/arbiter/tcp/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
A module implementing JSON-message connection factories.
"""

# internal
from runtimepy.net.stream import UdpPrefixedMessageConnection
from runtimepy.net.stream.json import JsonMessageConnection
from runtimepy.net.tcp.connection import TcpConnection
from runtimepy.net.websocket import WebsocketConnection


class TcpJsonMessageConnection(JsonMessageConnection, TcpConnection):
"""A TCP connection interface for JSON messaging."""


class UdpJsonMessageConnection(
JsonMessageConnection, UdpPrefixedMessageConnection
):
"""A UDP connection interface for JSON messaging."""


class WebsocketJsonMessageConnection(
JsonMessageConnection, WebsocketConnection
):
"""A websocket connection interface for JSON messaging."""
25 changes: 25 additions & 0 deletions runtimepy/net/factories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

# internal
from runtimepy.net.arbiter.tcp import TcpConnectionFactory
from runtimepy.net.arbiter.tcp.json import (
TcpJsonMessageConnection,
UdpJsonMessageConnection,
WebsocketJsonMessageConnection,
)
from runtimepy.net.arbiter.udp import UdpConnectionFactory
from runtimepy.net.arbiter.websocket import WebsocketConnectionFactory
from runtimepy.net.stream import (
Expand Down Expand Up @@ -73,6 +78,12 @@ class UdpNull(UdpConnectionFactory[NullUdpConnection]):
kind = NullUdpConnection


class UdpJson(UdpConnectionFactory[UdpJsonMessageConnection]):
"""UDP JSON-connection factory."""

kind = UdpJsonMessageConnection


class TcpEcho(TcpConnectionFactory[EchoTcpConnection]):
"""TCP echo-connection factory."""

Expand All @@ -97,6 +108,12 @@ class TcpNull(TcpConnectionFactory[NullTcpConnection]):
kind = NullTcpConnection


class TcpJson(TcpConnectionFactory[TcpJsonMessageConnection]):
"""TCP JSON-connection factory."""

kind = TcpJsonMessageConnection


class WebsocketEcho(WebsocketConnectionFactory[EchoWebsocketConnection]):
"""WebSocket echo-connection factory."""

Expand All @@ -107,3 +124,11 @@ class WebsocketNull(WebsocketConnectionFactory[NullWebsocketConnection]):
"""WebSocket null-connection factory."""

kind = NullWebsocketConnection


class WebsocketJson(
WebsocketConnectionFactory[WebsocketJsonMessageConnection]
):
"""WebSocket JSON-connection factory."""

kind = WebsocketJsonMessageConnection
135 changes: 15 additions & 120 deletions runtimepy/net/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,29 @@
"""
A module implementing a base, stream-oriented connection interface.
A module aggregating stream-oriented connection interfaces.
"""

# built-in
from io import BytesIO as _BytesIO
from typing import BinaryIO as _BinaryIO
from typing import Tuple, Type

# third-party
from vcorelib.io import ByteFifo
from typing import Tuple

# internal
from runtimepy.net.connection import BinaryMessage
from runtimepy.net.connection import Connection as _Connection
from runtimepy.net.stream.base import PrefixedMessageConnection
from runtimepy.net.stream.string import StringMessageConnection
from runtimepy.net.tcp.connection import TcpConnection
from runtimepy.net.udp.connection import UdpConnection
from runtimepy.primitives import Uint32, UnsignedInt


class PrefixedMessageConnection(_Connection):
"""
A connection for handling inter-frame message size prefixes for some
stream-oriented protocols.
"""

message_length_kind: Type[UnsignedInt] = Uint32
reading_header: bool

def init(self) -> None:
"""Initialize this instance."""

# Header parsing.
self.buffer = ByteFifo()
self.reading_header = True
self.message_length_in = self.message_length_kind()
self.prefix_size = self.message_length_in.size

self.message_length_out = self.message_length_kind()

def _send_message(
self, data: BinaryMessage, addr: Tuple[str, int] = None
) -> None:
"""Underlying data send."""

del addr
self.send_binary(data)

def send_message(
self, data: BinaryMessage, addr: Tuple[str, int] = None
) -> None:
"""Handle inter-message prefixes for outgoing messages."""

self.message_length_out.value = len(data)

with _BytesIO() as stream:
self.message_length_out.to_stream(
stream, byte_order=self.byte_order
)
stream.write(data)
self._send_message(stream.getvalue(), addr=addr)

def send_message_str(
self, data: str, addr: Tuple[str, int] = None
) -> None:
"""Convert a message to bytes before sending."""
self.send_message(data.encode(), addr=addr)

async def process_single(
self, stream: _BinaryIO, addr: Tuple[str, int] = None
) -> bool:
"""Process a single message."""
del stream
del addr
return True

async def process_binary(
self, data: bytes, addr: Tuple[str, int] = None
) -> bool:
"""Process an incoming message."""

result = True

self.buffer.ingest(data)

can_continue = True
while can_continue:
# Read the message size.
if self.reading_header:
size = self.buffer.pop(self.prefix_size)
if size is not None:
assert len(size) == self.prefix_size
self.message_length_in.update(
size, byte_order=self.byte_order
)
self.reading_header = False
else:
can_continue = False

# Read the message payload.
else:
message = self.buffer.pop(self.message_length_in.value)
if message is not None:
# process message
with _BytesIO(message) as stream:
result &= await self.process_single(stream, addr=addr)
self.reading_header = True
else:
can_continue = False

return result
__all__ = [
"PrefixedMessageConnection",
"StringMessageConnection",
"TcpPrefixedMessageConnection",
"UdpPrefixedMessageConnection",
"EchoMessageConnection",
"EchoTcpMessageConnection",
"EchoUdpMessageConnection",
"TcpStringMessageConnection",
"UdpStringMessageConnection",
]


class TcpPrefixedMessageConnection(PrefixedMessageConnection, TcpConnection):
Expand Down Expand Up @@ -157,26 +72,6 @@ class EchoUdpMessageConnection(
"""A connection that just echoes what it was sent."""


class StringMessageConnection(PrefixedMessageConnection):
"""A simple string-message sending and processing connection."""

async def process_message(
self, data: str, addr: Tuple[str, int] = None
) -> bool:
"""Process a string message."""

del addr
self.logger.info(data)
return True

async def process_single(
self, stream: _BinaryIO, addr: Tuple[str, int] = None
) -> bool:
"""Process a single message."""

return await self.process_message(stream.read().decode(), addr=addr)


class TcpStringMessageConnection(StringMessageConnection, TcpConnection):
"""A simple string-message sending and processing connection using TCP."""

Expand Down
109 changes: 109 additions & 0 deletions runtimepy/net/stream/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
A module implementing a base, stream-oriented connection interface.
"""

# built-in
from io import BytesIO as _BytesIO
from typing import BinaryIO as _BinaryIO
from typing import Tuple, Type

# third-party
from vcorelib.io import ByteFifo

# internal
from runtimepy.net.connection import BinaryMessage
from runtimepy.net.connection import Connection as _Connection
from runtimepy.primitives import Uint32, UnsignedInt


class PrefixedMessageConnection(_Connection):
"""
A connection for handling inter-frame message size prefixes for some
stream-oriented protocols.
"""

message_length_kind: Type[UnsignedInt] = Uint32
reading_header: bool

def init(self) -> None:
"""Initialize this instance."""

# Header parsing.
self.buffer = ByteFifo()
self.reading_header = True
self.message_length_in = self.message_length_kind()
self.prefix_size = self.message_length_in.size

self.message_length_out = self.message_length_kind()

def _send_message(
self, data: BinaryMessage, addr: Tuple[str, int] = None
) -> None:
"""Underlying data send."""

del addr
self.send_binary(data)

def send_message(
self, data: BinaryMessage, addr: Tuple[str, int] = None
) -> None:
"""Handle inter-message prefixes for outgoing messages."""

self.message_length_out.value = len(data)

with _BytesIO() as stream:
self.message_length_out.to_stream(
stream, byte_order=self.byte_order
)
stream.write(data)
self._send_message(stream.getvalue(), addr=addr)

def send_message_str(
self, data: str, addr: Tuple[str, int] = None
) -> None:
"""Convert a message to bytes before sending."""
self.send_message(data.encode(), addr=addr)

async def process_single(
self, stream: _BinaryIO, addr: Tuple[str, int] = None
) -> bool:
"""Process a single message."""
del stream
del addr
return True

async def process_binary(
self, data: bytes, addr: Tuple[str, int] = None
) -> bool:
"""Process an incoming message."""

result = True

self.buffer.ingest(data)

can_continue = True
while can_continue:
# Read the message size.
if self.reading_header:
size = self.buffer.pop(self.prefix_size)
if size is not None:
assert len(size) == self.prefix_size
self.message_length_in.update(
size, byte_order=self.byte_order
)
self.reading_header = False
else:
can_continue = False

# Read the message payload.
else:
message = self.buffer.pop(self.message_length_in.value)
if message is not None:
# process message
with _BytesIO(message) as stream:
result &= await self.process_single(stream, addr=addr)
self.reading_header = True
else:
can_continue = False

return result
Loading

0 comments on commit bcbb047

Please sign in to comment.