Skip to content

Commit

Permalink
fix: improve & test timetag / impulse parsing & serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Christopher Arndt <chris@chrisarndt.de>
  • Loading branch information
SpotlightKid committed Nov 13, 2023
1 parent 1b8f01b commit fb1c79b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 19 deletions.
23 changes: 22 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import unittest

from uosc.client import Bundle, create_message, pack_bundle
from uosc.common import NTP_DELTA
from uosc.common import Impulse, TimetagNow, NTP_DELTA

try:
from struct import error as StructError
Expand Down Expand Up @@ -141,6 +141,27 @@ def test_create_message_combi(self):
b'?\x9d\xf3\xb6@\xb5\xb2-',
'/big', 1000, -1, u'hello', bytearray(range(6)), 1.234, 5.678)

def test_create_message_timetag(self):
ntptime = 3657147741.655295
self.assertMessage(b'/tt\0,t\0\0\xd9\xfb\xa5]\xa7\xc1h\x00', '/tt',
('t', ntptime))

def test_create_message_timetag_int(self):
ntptime = 3657147741
self.assertMessage(b'/tt\0,t\0\0\xd9\xfb\xa5]\x00\x00\x00\x00', '/tt',
('t', ntptime))

def test_create_message_timetag_now(self):
self.assertMessage(b'/tt\0,t\0\0\0\0\0\0\0\0\0\x01', '/tt',
('t', TimetagNow))
self.assertMessage(b'/tt\0,t\0\0\0\0\0\0\0\0\0\x01', '/tt',
TimetagNow)

def test_create_message_impulse(self):
self.assertMessage(b'/inf\0\0\0\0,I\0\0', '/inf', ('I', Impulse))
self.assertMessage(b'/inf\0\0\0\0,I\0\0', '/inf', ('I', None))
self.assertMessage(b'/inf\0\0\0\0,I\0\0', '/inf', Impulse)


class TestBundle(unittest.TestCase):
timetag = 3657147741.655295
Expand Down
15 changes: 14 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# -*- coding: utf-8 -*-

import time
import unittest

from uosc.server import Impulse, parse_bundle, parse_message
from struct import pack

from uosc.common import Impulse, ISIZE, NTP_DELTA, TimetagNow
from uosc.server import parse_bundle, parse_message


typegen = type((lambda: (yield))())
Expand Down Expand Up @@ -93,6 +97,15 @@ def test_parse_message_combi(self):
self.assertAlmostEqual(args[-2], 1.234)
self.assertAlmostEqual(args[-1], 5.678)

def test_parse_message_timetag(self):
ntpnow = time.time() + NTP_DELTA
sec = int(ntpnow)
tt = pack(">II", sec, int(abs(ntpnow - sec) * ISIZE))
self.assertMessage(('/tt', 't', (ntpnow,)), b'/tt\0,t\0\0' + tt)

def test_parse_message_timetag_now(self):
self.assertMessage(('/tt', 't', (TimetagNow,)), b'/tt\0,t\0\0\0\0\0\0\0\0\x00\x01')


class TestParseBundle(unittest.TestCase):
timetag = 3657147741.6552954
Expand Down
24 changes: 16 additions & 8 deletions uosc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
except ImportError:
from struct import pack

from uosc.common import Bundle, to_frac
from uosc.common import Bundle, Impulse, TimetagNow, to_frac


if isinstance('', bytes):
Expand All @@ -22,14 +22,16 @@
unicodetype = str

TYPE_MAP = {
int: 'i',
float: 'f',
bytes: 'b',
bytearray: 'b',
unicodetype: 's',
True: 'T',
bytes: 'b',
float: 'f',
int: 'i',
False: 'F',
Impulse: 'I',
None: 'N',
TimetagNow: 't',
True: 'T',
unicodetype: 's',
}


Expand Down Expand Up @@ -112,6 +114,8 @@ def create_message(address, *args):
* ``None``: N
* ``True``: T
* ``False``: F
* ``uosc.common.Impulse``: I
* ``uosc.common.TimetagNow``: t
If you want to encode a Python object to another OSC type, you have to pass
a ``(typetag, data)`` tuple, where ``data`` must be of the appropriate type
Expand All @@ -123,7 +127,8 @@ def create_message(address, *args):
* I: ``None`` (unused)
* m: ``tuple / list`` of 4 ``int``s or ``bytes / bytearray`` of length 4
* r: same as 'm'
* t: OSC timetag as as ``int / float`` seconds since the NTP epoch
* t: OSC timetag as as ``int / float`` seconds since the NTP epoch or the
``uosc.common.TimetagNow`` constant
* S: ``str``
"""
Expand Down Expand Up @@ -153,7 +158,10 @@ def create_message(address, *args):
elif typetag == 'h':
data.append(pack('>q', arg))
elif typetag == 't':
data.append(pack_timetag(arg))
if arg is TimetagNow:
data.append(pack('>Q', 1))
else:
data.append(pack_timetag(arg))
elif typetag not in 'IFNT':
raise TypeError("Argument of type '%s' not supported." % type_)

Expand Down
15 changes: 8 additions & 7 deletions uosc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,28 @@
NTP_DELTA = 2208988800
ISIZE = 4294967296 # 2**32

TimetagNow = object()

class Impulse:
pass
Impulse = object()


class Bundle:
"""Container for an OSC bundle."""

def __init__(self, *items):
"""Create bundle from given OSC timetag and messages/sub-bundles.
"""Create bundle from given OSC timetag and messages and sub-bundles.
An OSC timetag can be given as the first positional argument, and must
be an int or float of seconds since the NTP epoch (1990-01-01 00:00).
It defaults to the current time.
be an int or float of seconds since the NTP epoch (1990-01-01 00:00) or
the ``uosc.common.TimetagNow`` constant. It defaults to the current
time.
Pass in messages or bundles via positional arguments as binary data
(bytes as returned by ``create_message`` resp. ``Bundle.pack``) or as
``Bundle`` instances or (address, *args) tuples.
``Bundle`` instances or ``(oscaddress, *args)`` tuples.
"""
if items and isinstance(items[0], (int, float)):
if items and (isinstance(items[0], (int, float)) or items[0] is TimetagNow):
self.timetag = items[0]
items = items[1:]
else:
Expand Down
7 changes: 5 additions & 2 deletions uosc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
except ImportError:
import uosc.fakelogging as logging

from uosc.common import Impulse, to_time
from uosc.common import Impulse, TimetagNow, to_time


log = logging.getLogger("uosc.server")
Expand All @@ -33,7 +33,10 @@ def split_oscblob(msg, offset):

def parse_timetag(msg, offset):
"""Parse an OSC timetag from msg at offset."""
return to_time(unpack('>II', msg[offset:offset + 4]))
if msg[offset + 7] == 1:
return TimetagNow
else:
return to_time(*unpack('>II', msg[offset:offset + 8]))


def parse_message(msg, strict=False):
Expand Down

0 comments on commit fb1c79b

Please sign in to comment.