Skip to content

Commit

Permalink
Update implementation of PhysioLog to use Lark
Browse files Browse the repository at this point in the history
Specifically, we define the parser using a Lark grammar, and then
generate the parser code using Lark's `lark` command-line tool,
which generates a Python module (`_generated.py`) that we can import.
  • Loading branch information
andrewrosss committed Oct 31, 2023
1 parent 558bd8a commit d3bcc80
Show file tree
Hide file tree
Showing 3 changed files with 5,251 additions and 150 deletions.
235 changes: 86 additions & 149 deletions src/fmri_physio_log/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,29 @@
from typing import NamedTuple
from typing import TextIO

from pyparsing import Group
from pyparsing import Literal
from pyparsing import nums
from pyparsing import OneOrMore
from pyparsing import ParseResults
from pyparsing import printables
from pyparsing import Suppress
from pyparsing import Word
from fmri_physio_log._generated import Lark_StandAlone as _Lark_StandAlone
from fmri_physio_log._generated import Tree as _Tree
from fmri_physio_log._generated import Visitor_Recursive as _Visitor_Recursive


# If this line moves files, update .bumpversion.cfg
__version__ = "0.3.1"


class PhysioLog:
def __init__(self, content: str):
def __init__(self, content: str, *, n_params: int = 4):
self.content = content
self._grammar = create_grammar()
self.n_params = n_params

self._parse_results: ParseResults
self._parser = _Lark_StandAlone()
self._visitor = _PhysioLogVisitor()

self.data: list[int]

self.params: tuple[int, ...]
self.ts: list[int]
self.rate: int
self.info: list[str]
self.ts: list[int]

self.ecg: MeasurementSummary
self.puls: MeasurementSummary
Expand Down Expand Up @@ -59,165 +57,43 @@ def from_filename(cls, filename: str | Path):

def parse(self):
# parse the content with the grammer
self._parse_results = self._grammar.parse_string(self.content)
tree = self._parser.parse(self.content)
self._visitor.visit(tree)

# the results are split into two top-level groups: the body and the footer.
# since ParseResults are iterable we can unpack the sections like this
body, footer = self._parse_results
# interpret the parsed content
self._body()
self._footer()

# interpret the results and "populate" this instance
self._body(body)
self._footer(footer)

def _body(self, body: ParseResults) -> None:
def _body(self) -> None:
"""Interpret the physio file body (everything before the '5003' tag)
The body is composed of the initial physio params
(which includes the sampling rate), info sections,
and the data.
"""
_params, *_data = body
self.params = tuple(int(s) for s in _params)
self.params = tuple(self._visitor._data[: self.n_params])
self.rate = self.params[2] if len(self.params) == 4 else self.params[3]
self.info = []
self.ts = []
for expr in _data:
if isinstance(expr, ParseResults):
self.info.append(str(expr[0]))
elif isinstance(expr, str):
self.ts.append(int(expr))
else:
raise SyntaxError(f"Unknown token in body {expr!r}")

def _footer(self, footer: ParseResults) -> None:
self.info = self._visitor._info[:]
self.ts = self._visitor._data[self.n_params :]

def _footer(self) -> None:
"""Interpret the physio footer (everything after the '5003' tag)
The footer is composed of the 'measurement summaries'
(freq, per, min, max, etc.), the 'nr summary' and 'log times'
"""
_summaries: dict[str, list[int]] = defaultdict(list)
_logs: dict[str, list[int]] = defaultdict(list)

# organize the parsed footer
for expr in footer:
if expr.get_name() in ("rate", "stat"):
# rates come before stats, so they'll get packed in first
# in the resulting list
key, *values = expr
_summaries[key].extend([int(v) for v in values])
elif expr.get_name() == "nr":
self.nr = NrSummary(*(int(v) for v in expr))
elif expr.get_name() == "log":
# start times come before stop times, so they'll get packed in
# first in the resulting list
_, key, value = expr
_logs[key].append(int(value))
else:
raise SyntaxError(f"Unknown token in footer {expr!r}")
# set the nr summary attributes
self.nr = NrSummary(*self._visitor._nr)

# set the measurement summary attributes
for attr, values in _summaries.items():
for attr, values in self._visitor._summaries.items():
setattr(self, attr.lower(), MeasurementSummary(*values))

# set the log time attributes
for attr, values in _logs.items():
for attr, values in self._visitor._logs.items():
setattr(self, attr.lower(), LogTime(*values))


def create_grammar():
"""Creates the pyparsing grammar used to parse pmu files.
Roughly speaking, this is the PMU grammar that this function creates:
<content> ::= <body> <footer>
<body> ::= <params> <data>
<params> ::= <detailed_params> | <simple_params>
<simple_params> ::= INT INT INT INT
<detailed_params> ::= INT INT INT INT [INT] <info>
<info> ::= 5002 PRINTABLE 6002
<data> ::= (<info> | 5000 | INT)+
<footer> ::= 5003 <footer_line> [<footer_line>]+ 6003
<footer_line> ::= <rate_line> | <stat_line> | <nr_line> | <log_line>
<rate_line> ::= <modality> <rate>
<rate> ::= <rate_prefix> INT INT
<rate_prefix> ::= 'Freq Per:'
<stat_line> ::= <modality> <stat>
<stat> ::= <stat_prefix> INT INT INT INT
<stat_prefix> ::= 'Min Max Avg StdDiff:'
<nr_line> ::= <nr_prefix> INT INT INT INT
<nr_prefix> ::= 'NrTrig NrMP NrArr AcqWin:'
<log_line> ::= <log_prefix> INT
<log_prefix> ::= 'LogStartMDHTime:'
| 'LogStopMDHTime:'
| 'LogStartMPCUTime:'
| 'LogStopMPCUTime:'
<modality> ::= 'ECG' | 'PULS' | 'RESP' | 'EXT' | 'EXT2'
"""
_int = Word(nums)

# footer
modality = (
Literal("ECG")
| Literal("PULS")
| Literal("RESP")
| Literal("EXT2")
| Literal("EXT")
)
log_type = Literal("MDH") | Literal("MPCU")
log_event = Literal("Start") | Literal("Stop")
rate = (
Suppress("Freq") + Suppress("Per") + Suppress(":") + _int("freq") + _int("per")
)
stat = (
Suppress("Min")
+ Suppress("Max")
+ Suppress("Avg")
+ Suppress("StdDiff")
+ Suppress(":")
+ _int("min")
+ _int("max")
+ _int("avg")
+ _int("stddiff")
)
log_key = Suppress("Log") + log_event("event") + log_type("type") + Suppress("Time")
log_line = log_key + Suppress(":") + _int("time")
nr_line = (
Suppress("NrTrig")
+ Suppress("NrMP")
+ Suppress("NrArr")
+ Suppress("AcqWin")
+ Suppress(":")
+ _int("nrtrig")
+ _int("nrmp")
+ _int("nrarr")
+ _int("acqwin")
)
stat_line = modality("modality") + stat
rate_line = modality("modality") + rate
footer_line = (
rate_line("rate") | stat_line("stat") | nr_line("nr") | log_line("log")
)
footer = Suppress("5003") + OneOrMore(Group(footer_line)) + Suppress("6003")

# body
info = (
Suppress("5002")
+ OneOrMore(Word(printables), stop_on="6002").set_parse_action(" ".join)
+ Suppress("6002")
)
data = OneOrMore(Group(info("info")) | Suppress("5000") | _int, stop_on="5003")
detailed_params = Group(_int[4, 5]("params")) + Group(info("info"))
simple_params = Group(_int[4]("params"))
params = detailed_params | simple_params
body = params + data

grammar = Group(body("body")) + Group(footer("footer"))

return grammar


class MeasurementSummary(NamedTuple):
freq: int
per: int
Expand Down Expand Up @@ -276,3 +152,64 @@ def logptime(timestamp: int) -> datetime.time:
sec = int(timestamp / 1000) - (hour * 60 * 60 + min * 60)
msec = timestamp - (hour * 1000 * 60 * 60 + min * 1000 * 60 + sec * 1000)
return datetime.time(hour, min, sec, msec * 1000) # since midnight


class _PhysioLogVisitor(_Visitor_Recursive):
def __init__(self):
self._data: list[int] = []
self._info: list[str] = []
self._summaries: dict[str, list[int]] = defaultdict(list)
self._nr: list[int] = [] # only the last one is used
self._logs: dict[str, list[int]] = defaultdict(list)

def data(self, tree: _Tree):
self._data = [i for i in _iter_ints(tree)]

def info(self, tree: _Tree):
_info = tree.children[0]
if isinstance(_info, str):
self._info.append(_info)

def rate_line(self, tree: _Tree):
modality = next(tree.find_pred(lambda t: t.data == "modality")).children[0]
if isinstance(modality, str):
# rate lines come before stat lines, so the rate data will get
# packed in first in the resulting list
# TODO(andrewrosss): add runtime check to validate this claim
self._summaries[modality].extend([i for i in _iter_ints(tree)])

def stat_line(self, tree: _Tree):
modality = next(tree.find_pred(lambda t: t.data == "modality")).children[0]
if isinstance(modality, str):
self._summaries[modality].extend([i for i in _iter_ints(tree)])

def nr_line(self, tree: _Tree):
self._nr = [i for i in _iter_ints(tree)]

def log_line(self, tree: _Tree):
log_type = tree.children[0]

if not isinstance(log_type, str):
raise SyntaxError(f"Unknown log type {log_type!r}")
elif "MDH" in log_type:
log_type = "MDH"
elif "MPCU" in log_type:
log_type = "MPCU"
else:
raise SyntaxError(f"Unknown log type {log_type!r}")

# start times come before stop times, so they'll get packed in
# first in the resulting list
# TODO(andrewrosss): add runtime check to validate this claim
self._logs[log_type].extend([i for i in _iter_ints(tree)])


def _iter_ints(tree: _Tree):
if hasattr(tree.data, "type") and tree.data.type == "INT": # type: ignore
yield int(tree.data)
else:
for c in tree.children:
if isinstance(c, _Tree):
yield from _iter_ints(c)
elif c.type == "INT":
yield int(c)
Loading

0 comments on commit d3bcc80

Please sign in to comment.