Skip to content

Commit

Permalink
add state abstraction and wip on update lock for concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
falkecarlsen committed Aug 12, 2024
1 parent dcad45a commit ddf516d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 31 deletions.
96 changes: 68 additions & 28 deletions py_driver/driver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys
import threading
from datetime import timedelta
from dataclasses import dataclass, field
from datetime import timedelta, datetime
from time import sleep, time
from typing import Optional

import serial
import utils

Expand Down Expand Up @@ -47,29 +50,57 @@ def print_blue(text):
print(f"{ColorPrinting.OKBLUE}{text}{ColorPrinting.ENDC}")


@dataclass
class ClaireState:
"""
The state of the Claire demonstrator. Can be used to cache the state.
"""
Tube1_sonar_dist_mm: Optional[float] = None
Tube2_sonar_dist_mm: Optional[float] = None
Tube1_inflow_duty: Optional[int] = None
Tube1_outflow_duty: Optional[int] = None
Tube2_inflow_duty: Optional[int] = None
Tube2_outflow_duty: Optional[int] = None
Stream_inflow_duty: Optional[int] = None
Stream_outflow_duty: Optional[int] = None
dynamic: Optional[bool] = None
last_update: datetime = datetime.now()

def __init__(self):
self.state = None
self.outdated = True
self.last_update = 0
self.dynamic = None

def set_state(self, state):
"""
Set the cached state to the provided state. Assumes that the provided state is actual.
:param state: The new state to cache.
"""
self.state = state
self.outdated = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \
state["Tube2_outflow_duty"]
try:
self.state = state
for key, value in state.items():
if hasattr(self, key):
setattr(self, key, value)
self.last_update = datetime.now()
self.dynamic = state["Tube1_inflow_duty"] or state["Tube1_outflow_duty"] or state["Tube2_inflow_duty"] or \
state["Tube2_outflow_duty"]
except Exception as e:
print(f"Exception occurred during state update: {e}")
raise # Re-raise the exception after handling

def make_dynamic(self):
"""
Label the cached state as dynamic.
When the demonstrator is being acted upon, all state updates are outdated from when they are measured.
"""
self.dynamic = True

def make_outdated(self):
"""Label the cached state as outdated."""
self.outdated = True
@property
def tube1_dist(self) -> Optional[float]:
return self.Tube1_sonar_dist_mm

@property
def tube2_dist(self) -> Optional[float]:
return self.Tube2_sonar_dist_mm


class ClaireDevice:
Expand All @@ -79,7 +110,7 @@ class ClaireDevice:

def __init__(self, port):
self.device = port
self.heartbeat = time()
self.heartbeat = time() # last time device was alive
self.busy = True # initially unknown, therefore busy
self.state = ClaireState()
# read timeout in secs, 1 should be sufficient
Expand Down Expand Up @@ -110,6 +141,10 @@ def __init__(self, port):
sleep(1)
self.check_version()

print(f'{TAG} Device initialized. Getting initial state...')
self.update_state()


def alive(self):
"""Check if the device is still alive within bound."""
return time() - self.heartbeat < COMMUNICATION_TIMEOUT
Expand All @@ -136,19 +171,19 @@ def _underflow_check(self):
self.update_state()

# check underflows
if self.state.state["Tube1_sonar_dist_mm"] < TUBE_MAX_LEVEL:
if self.state.Tube1_sonar_dist_mm < TUBE_MAX_LEVEL:
# if outflow is active while inflow is stopped, error out
if self.state.state["Tube1_outflow_duty"] > 0 and self.state.state["Tube1_inflow_duty"] == 0:
if self.state.Tube1_outflow_duty > 0 and self.state.Tube1_inflow_duty == 0:
self.set_outflow(1, 0)
print(
f'{TAG}: WARN: Low water level detected in tube 1: {self.state.state["Tube1_sonar_dist_mm"]}. Stopped outflow')
f'{TAG}: WARN: Low water level detected in tube 1: {self.state.Tube1_sonar_dist_mm}. Stopped outflow')

elif self.state.state["Tube2_sonar_dist_mm"] < TUBE_MAX_LEVEL:
elif self.state.Tube2_sonar_dist_mm < TUBE_MAX_LEVEL:
# if outflow is active while inflow is stopped, error out
if self.state.state["Tube2_outflow_duty"] > 0 and self.state.state["Tube2_inflow_duty"] == 0:
if self.state.Tube2_outflow_duty > 0 and self.state.Tube2_inflow_duty == 0:
self.set_outflow(2, 0)
print(
f'{TAG}: WARN: Low water level detected in tube 2: {self.state.state["Tube2_sonar_dist_mm"]}. Stopped outflow')
f'{TAG}: WARN: Low water level detected in tube 2: {self.state.Tube2_sonar_dist_mm}. Stopped outflow')

else:
if DEBUG:
Expand Down Expand Up @@ -237,9 +272,9 @@ def check_version(self):

def update_state(self, tube=None, quick=False):
"""Get the last state of the device. If cached state is outdated, a new sensor reading is requested."""
# Return cached state if not outdated.
if not self.state.outdated and self.state.last_update >= time() - COMMUNICATION_TIMEOUT:
return self.state.state
# Return cached state if not outdated nor unstable.
if not self.state.dynamic and self.state.last_update >= datetime.now() - timedelta(COMMUNICATION_TIMEOUT):
return self.state

# Ask for new state reading.
size_buffer = self.last_printed_buf_line
Expand All @@ -256,12 +291,16 @@ def update_state(self, tube=None, quick=False):
else:
arg += ";"

# while busy, wait
while not self.ready():
sleep(1)

self.write(arg)

# Wait for the state to be received.
total_wait = 0
while True:
# Fixme: not robust
# Fixme: not robust looking for {
if self.last_printed_buf_line > size_buffer and self.read_buffer[-2][0] == '{':
# If we received a line starting with {, we have received the new state.
break
Expand All @@ -284,7 +323,8 @@ def update_state(self, tube=None, quick=False):
state["Tube2_sonar_dist_mm"] = round(self.convert_distance_to_level(state["Tube2_sonar_dist_mm"]), 1)
self.state = ClaireState()
self.state.set_state(state)
return state
return True
return False

def get_last_raw_state(self):
"""Get the last raw state of the device without polling."""
Expand All @@ -299,9 +339,9 @@ def get_last_raw_state(self):
def print_state(self):
"""Print state of the system."""
# seconds since state was grabbed
if self.state.state:
old = timedelta(seconds=time() - self.state.last_update)
print(f'{TAG} State ({old} secs old): {self.state}')
if self.state:
old = datetime.now() - self.state.last_update
print(f'{TAG} State ({old} old): {self.state}')
else:
print(f'{TAG} State: N/A')

Expand Down Expand Up @@ -338,7 +378,7 @@ def set_water_level(self, tube, level):
sleep(1)
self.write(f"5 {tube} {self.convert_level_to_distance(level)};")
self.busy = True
self.state.make_outdated()
self.state.make_dynamic()

def set_inflow(self, tube, rate):
"""
Expand All @@ -353,7 +393,7 @@ def set_inflow(self, tube, rate):
sleep(1)
pump = (tube - 1) * 2 + 1
self.write(f"4 {pump} {rate};")
self.state.make_outdated()
self.state.make_dynamic()

def set_outflow(self, tube, rate):
"""
Expand All @@ -368,7 +408,7 @@ def set_outflow(self, tube, rate):
sleep(1)
pump = tube * 2
self.write(f"4 {pump} {rate};")
self.state.make_outdated()
self.state.make_dynamic()

@staticmethod
def convert_distance_to_level(distance):
Expand Down
6 changes: 3 additions & 3 deletions py_driver/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

# Insert the name of the usb port, which might be different for different devices.
# An easy way to get the port name is to use the Arduino IDE.
# PORT = '/dev/ttyUSB0'
PORT = '/dev/cu.usbserial-1420'
PORT = '/dev/ttyUSB1'
#PORT = '/dev/cu.usbserial-1420'


def example_experiment():
claire = driver.ClaireDevice(PORT)
state = claire.update_state() # get current state of device
claire.print_state()
print(f'Current height of TUBE1: {state["Tube1_sonar_dist_mm"]}')
print(f'Current height of TUBE1: {state.Tube1_sonar_dist_mm}')

claire.set_inflow(1, 100)
sleep(3)
Expand Down

0 comments on commit ddf516d

Please sign in to comment.