Skip to content

Commit

Permalink
Sync TaskTiger worker heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasst committed Apr 22, 2024
1 parent 5384289 commit c7f937e
Showing 1 changed file with 55 additions and 13 deletions.
68 changes: 55 additions & 13 deletions tasktiger/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ def __init__(self, worker: "Worker"):
self.connection = worker.connection
self.config = worker.config

def heartbeat(
self,
queue: str,
task_ids: List[str],
log: BoundLogger,
locks: Collection[Lock],
queue_lock: Optional[Semaphore],
) -> bool:
self.worker.heartbeat(queue, all_task_ids)
for lock in locks:
try:
lock.reacquire()
except LockError:
log.warning("could not reacquire lock", lock=lock.name)
if queue_lock:
acquired, current_locks = queue_lock.renew()
if not acquired:
log.debug("queue lock renew failure")

def execute(
self,
queue: str,
Expand Down Expand Up @@ -351,18 +370,7 @@ def check_child_exit() -> Optional[int]:
break

try:
self.worker.heartbeat(queue, all_task_ids)
for lock in locks:
try:
lock.reacquire()
except LockError:
log.warning(
"could not reacquire lock", lock=lock.name
)
if queue_lock:
acquired, current_locks = queue_lock.renew()
if not acquired:
log.debug("queue lock renew failure")
self.heartbeat(queue, all_task_ids, log, locks, queue_lock)
except OSError as e:
# EINTR happens if the task completed. Since we're just
# renewing locks/heartbeat it's okay if we get interrupted.
Expand All @@ -386,6 +394,19 @@ class SyncExecutor(Executor):

exit_worker_on_job_timeout = True

def _periodic_heartbeat(
self,
queue: str,
task_ids: List[str],
log: BoundLogger,
locks: Collection[Lock],
queue_lock: Optional[Semaphore],
stop_event: threading.Event,
):
while not stop_event.is_set():
stop_event.wait(self.config["ACTIVE_TASK_UPDATE_TIMEOUT"])
self.heartbeat(queue, task_ids, log, locks, queue_lock)

def execute(
self,
queue: str,
Expand All @@ -394,5 +415,26 @@ def execute(
locks: Collection[Lock],
queue_lock: Optional[Semaphore],
) -> bool:
# Run heartbeat thread.
all_task_ids = {task.id for task in tasks}
stop_event = threading.Event()
heartbeat_thread = threading.Thread(
target=self._periodic_heartbeat,
kwargs={
"queue": queue,
"task_ids": all_task_ids,
"log": log,
"locks": locks,
"queue_lock": queue_lock,
"stop_event": stop_event,
},
)

# Run the tasks.
return self.execute_tasks(tasks, log)
result = self.execute_tasks(tasks, log)

# Stop the heartbeat thread.
stop_event.set()
heartbeat_thread.join()

return result

0 comments on commit c7f937e

Please sign in to comment.