Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Remove WIP heartbeat check underflow check and refactor #71

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions py_driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,16 @@ def _underflow_check(self):
"""
TAG = "UNDERFLOW_CHECK"
while True:
if not self.ready():
# do liveness check and update state if device is outdated but was ready on last communication
if self.outdated():
print(f"Device is outdated. {self.state.last_update=}, {datetime.now()=}")
# if last line is OK, then device is still alive, do update of state
if self.read_buffer and self.read_buffer[-1] == CLAIRE_READY_SIGNAL:
print(f"Device is alive. {self.state.last_update=}, {datetime.now()=}")
self.busy = False
self.update_state(quick=True)
else:
if DEBUG:
print(f'{TAG}: Device is not ready. Waiting {UNDERFLOW_CHECK_INTERVAL} seconds.')
sleep(UNDERFLOW_CHECK_INTERVAL)
continue
# if state is stale or dynamic, update is required
if self.state.dynamic or self.state.last_update < datetime.now() - timedelta(COMMUNICATION_TIMEOUT):
print(f"{TAG} State is stale or dynamic, requiring fresh state. {self.state.last_update=}, {datetime.now()=}")

# if not ready, delay and try again
if not self.ready():
sleep(1)
continue

# update state if device is dynamic
if self.state.dynamic:
# do quick update
self.update_state(quick=True)

# check underflows
Expand Down
Loading