Skip to content

Commit

Permalink
Merge pull request #300 from gyorilab/vensim-initials
Browse files Browse the repository at this point in the history
Improve vensim import
  • Loading branch information
bgyori authored Feb 29, 2024
2 parents f75c238 + 43804b0 commit 44e1612
Show file tree
Hide file tree
Showing 6 changed files with 782 additions and 529 deletions.
135 changes: 133 additions & 2 deletions mira/sources/system_dynamics/pysd.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""This module implements parsing of a generic pysd model irrespective of source and source type
and extracting its contents to create an equivalent MIRA template model.
"""

__all__ = ["template_model_from_pysd_model"]

import copy
import re
import typing as t
import logging
from more_itertools import chunked

import pandas as pd
import sympy
Expand All @@ -18,6 +22,8 @@
get_sympy,
)

logger = logging.getLogger(__name__)

CONTROL_VARIABLE_NAMES = {
"FINALTIME",
"INITIALTIME",
Expand All @@ -36,7 +42,13 @@
SYMPY_FLOW_RATE_PLACEHOLDER = safe_parse_expr("xxplaceholderxx")


def template_model_from_pysd_model(pysd_model, expression_map, *, grounding_map=None) -> TemplateModel:
def template_model_from_pysd_model(
pysd_model,
expression_map,
*,
grounding_map=None,
initials_map: t.Optional[t.Dict[str, float]] = None,
) -> TemplateModel:
"""Given a model and its accompanying expression_map, extract information from the arguments
to create an equivalent MIRA template model.
Expand All @@ -48,6 +60,8 @@ def template_model_from_pysd_model(pysd_model, expression_map, *, grounding_map=
Map of variable name to expression
grounding_map: dict[str, Concept]
A grounding map, a map from label to Concept
initials_map: dict[str, float]
A pre-populated mapping of initial values
Returns
-------
Expand Down Expand Up @@ -154,10 +168,16 @@ def template_model_from_pysd_model(pysd_model, expression_map, *, grounding_map=
for state_initial_value, (state_name, state_concept) in zip(
pysd_model.state, concepts.items()
):
if initials_map and (mapped_value := initials_map.get(state_name)):
initial = Initial(
concept=concepts[state_name].copy(deep=True),
expression=SympyExprStr(sympy.Float(mapped_value)),
)
# if the state value is not a number
if not isinstance(state_initial_value, int) and not isinstance(
elif not isinstance(state_initial_value, int) and not isinstance(
state_initial_value, float
):
logger.warning(f"got non-numeric state value for {state_name}: {state_initial_value}")
initial = Initial(
concept=concepts[state_name].copy(deep=True),
expression=SympyExprStr(sympy.Float("0")),
Expand Down Expand Up @@ -424,12 +444,123 @@ def preprocess_expression_text(expr_text):
# strip leading and trailing white spaces
# replace space between two words that makeup a variable name with "_"'
# replace single and doubel quotation marks
# replace ampersand & with and
expr_text = (
expr_text.strip()
.replace("^", "**")
.replace(" ", "_")
.replace("'", "")
.replace('"', "")
.replace("&", "_")
.lower()
)
return expr_text


def ifthenelse_to_piecewise(expr_text):
"""Convert Vensim if then else expression to sympy Piecewise string
Parameters
----------
expr_text : str
The string expression
Returns
-------
: str
The sympy Piecewise expression as a string
"""
# We find the three components and rearrange them for piecewise
if_then_else_regex = r"IF THEN ELSE\((.*),(.*)\,(.*)\)"
match = re.search(if_then_else_regex, expr_text)
if match:
condition, then, else_ = match.groups()
piecewise = f"Piecewise(({then}, {condition}), ({else_}, True))"
# We also need to replace single = with == but make sure if there is a ==, we don't replace
# it and also, we need to make sure we don't replace <= or >=
piecewise = re.sub(r"(?<!<|>)=(?!=)", "==", piecewise)
return piecewise


def with_lookup_to_piecewise(expr_text: str) -> str:
"""Convert a Vensim WITH LOOKUP expression to a piecewise function.
The semantics of the ``WITH LOOKUP`` element are documented at
https://www.vensim.com/documentation/fn_with_lookup.html.
For example, this could come from:
.. code-block::
Infection Rate new arrivals= WITH LOOKUP (
Time,
(
[(0,0)-(500,100)],(0,0),(1,2),(2,1),(3,0),(4,2),(5,1),(6,2),(7,3),(8,6),(9,2),(10,7\
),(11,10),(12,4),(13,10),(14,5),(15,
11),(16,14),(17,14),(18,26),(19,34),(20,35),(21,45),(22,55),(23,38),(24,34),(25,24)\
,(26,40),(27,16),(28,20),(29,12),(30,
23),(31,14),(32,8),(33,14),(34,12),(35,5),(36,9),(37,6),(38,0),(39,0),(40,0),(1000,\
0)
))
~ Persons/Day
~ |
Which ends up being the text (after normalization from vensim)
.. code-block::
with_lookup(time,([(0,0)-(500,100)],(0,0),(1,2),(2,1),(3,0),(4,2),(5,1),(6,2),\
(7,3),(8,6),(9,2),(10,7),(11,10),(12,4),(13,10),(1000,0)))
"""
# there's a variety of ways this is written WITH LOOKUP, with_lookup, withlookup
# so just normalize it all out
expr_text = expr_text.strip().replace(" ", "").replace("_", "").lower()
if not expr_text.lower().startswith("withlookup"):
raise ValueError(expr_text)
expr_text = expr_text[len("withlookup") :].lstrip("(").rstrip(")")
# The first input is either a value or an input variable name
# The second input is a list of X,Y pairs
variable, second = (x.strip() for x in expr_text.split(",", 1))
second: str = second.lstrip("(").rstrip(")")

# This is an undocumented part of this function, but it appears that the
# list begins with some kind of definition of the form [(a,b)-(c,d)]
if second.startswith("["):
box, rest = second.lstrip("[").split("]", 1)
x1, y1, x2, y2 = [
float(x.strip())
for x in box.replace("(", "")
.replace(")", "")
.replace("-", ",")
.split(",")
]
else:
rest = second
x1, y1, x2, y2 = [None] * 4

# TODO how to use x1, y1, x2, and y2? It's not clear if/how these are used
# to fill in gaps in the lookup table

# This gets the list of x,y pairs in order from the lookup table
pairs = list(
chunked(
(
float(x.strip())
for x in rest.strip()
.replace("(", "")
.replace(")", "")
.split(",")
if x.strip()
),
2,
)
)
# print(variable, x1, y1, x2, y2, pairs)

# construct the sympy string as a simple lookup, where the y
# value gets returned if the target variable is the given x value
# for each x,y pair
# FIXME what's the right way to write the conditional here
conditions = ",".join(f"({y}, {variable} >= {x})" for x, y in pairs)
sympy_str = f"Piecewise({conditions})"
return sympy_str
77 changes: 65 additions & 12 deletions mira/sources/system_dynamics/vensim.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from pysd.translators.vensim.vensim_file import VensimFile
import requests

from mira.metamodel import TemplateModel
from mira.metamodel import TemplateModel, Initial
from mira.sources.system_dynamics.pysd import (
template_model_from_pysd_model,
ifthenelse_to_piecewise,
with_lookup_to_piecewise,
)

__all__ = ["template_model_from_mdl_file", "template_model_from_mdl_url"]
Expand All @@ -34,7 +36,9 @@
CONTROL_VARIABLES = {"SAVEPER", "FINAL TIME", "INITIAL TIME", "TIME STEP"}


def template_model_from_mdl_file(fname, *, grounding_map=None) -> TemplateModel:
def template_model_from_mdl_file(
fname, *, grounding_map=None, initials=None, initials_from_integ: bool = False
) -> TemplateModel:
"""Return a template model from a local Vensim file
Parameters
Expand All @@ -43,6 +47,12 @@ def template_model_from_mdl_file(fname, *, grounding_map=None) -> TemplateModel:
The path to the local Vensim file
grounding_map: dict[str, Concept]
A grounding map, a map from label to Concept
initials: dict[str, float]
Explicit initial values to use for compartments in the
model. Will overwrite model-internal definitions.
initials_from_integ : bool
If true, gets initial values from INTEG expressions. If ``initials``
are given, they override anything from INTEG expressions
Returns
-------
Expand All @@ -51,12 +61,22 @@ def template_model_from_mdl_file(fname, *, grounding_map=None) -> TemplateModel:
"""
pysd_model = pysd.read_vensim(fname)
vensim_file = VensimFile(fname)
expression_map = extract_vensim_variable_expressions(vensim_file.model_text)

return template_model_from_pysd_model(pysd_model, expression_map, grounding_map=grounding_map)
expression_map, initials_map = extract_vensim_variable_expressions(
vensim_file.model_text, initials_from_integ=initials_from_integ,
)
if initials:
initials_map.update(initials)
return template_model_from_pysd_model(
pysd_model,
expression_map,
grounding_map=grounding_map,
initials_map=initials_map,
)


def template_model_from_mdl_url(url, *, grounding_map=None) -> TemplateModel:
def template_model_from_mdl_url(
url, *, grounding_map=None, initials=None, initials_from_integ: bool = False
) -> TemplateModel:
"""Return a template model from a Vensim file provided by an url
Parameters
Expand All @@ -65,6 +85,12 @@ def template_model_from_mdl_url(url, *, grounding_map=None) -> TemplateModel:
The url to the mdl file
grounding_map: dict[str, Concept]
A grounding map, a map from label to Concept
initials: dict[str, float]
Explicit initial values to use for compartments in the
model. Will overwrite model-internal definitions.
initials_from_integ : bool
If true, gets initial values from INTEG expressions. If ``initials``
are given, they override anything from INTEG expressions
Returns
-------
Expand All @@ -79,17 +105,26 @@ def template_model_from_mdl_url(url, *, grounding_map=None) -> TemplateModel:
with temp_file as file:
file.write(data)

return template_model_from_mdl_file(temp_file.name, grounding_map=grounding_map)
return template_model_from_mdl_file(
temp_file.name,
grounding_map=grounding_map,
initials=initials,
initials_from_integ=initials_from_integ,
)


# look past control section
def extract_vensim_variable_expressions(model_text):
def extract_vensim_variable_expressions(
model_text, *, initials_from_integ: bool = False
):
"""Method that extracts expressions for each variable in a Vensim file
Parameters
----------
model_text : str
The plain-text information about the Vensim file
initials_from_integ : bool
If true, gets initial values from INTEG expressions
Returns
-------
Expand Down Expand Up @@ -141,22 +176,40 @@ def extract_vensim_variable_expressions(model_text):
# "INTEG" is the keyword used to define a state/stock
# however, we can't yet handle if/then/else constructs, so we skip them
if "if then else" in text_expression.lower():
expression_map[old_var_name] = "0"
continue
text_expression = ifthenelse_to_piecewise(text_expression)

if "with lookup" in text_expression.lower():
text_expression = with_lookup_to_piecewise(text_expression)

# If there's an INTEG expression, we can extract an initial value,
# otherwise, it is none.
initial = None

# If we come across a state, get the expression for the state only

# For the hackathon Vensim file, we can use a new regex that gets not only the expression
# but the initial value as well. Because when pysd ingests the hackathon Vensim file,
# it will have 44 initial values for only 19 states.
if "INTEG" in text_expression:
text_expression = re.search(r"\(([^,]+),", text_expression).group(1)
match = re.search(r"\(([^,]+),\s*(.*)?\)", text_expression)
text_expression = match.group(1)
initial = match.group(2)

expression_map[old_var_name] = text_expression

if initials_from_integ:
# CTH: I couldn't find where this normalization happens
# between the vensim and pysd code, so I added it again here.
# also, the initial value might be an expression that needs normalizing
initial_values[_norm(old_var_name)] = initial and _norm(initial)

# remove any control variables listed past the control section that were added to the
# expression map
for control_var in CONTROL_VARIABLES:
expression_map.pop(control_var)

return expression_map
return expression_map, initial_values


def _norm(s):
return s.lower().replace(" ", "_")
Loading

0 comments on commit 44e1612

Please sign in to comment.