Skip to content

Commit

Permalink
Extract handler methods to ABC (#342)
Browse files Browse the repository at this point in the history
This is a refactor to avoid direct access to handler components from
the REST app.

To allow further changes to the handler management of the worker and
run engine in future.

In preparation for #317
  • Loading branch information
joeshannon committed Dec 13, 2023
1 parent 885636a commit c5bf8f9
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 116 deletions.
111 changes: 81 additions & 30 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Mapping, Optional
from typing import List, Mapping, Optional

from blueapi.config import ApplicationConfig
from blueapi.core import BlueskyContext
Expand All @@ -13,17 +13,21 @@
from blueapi.messaging import StompMessagingTemplate
from blueapi.messaging.base import MessagingTemplate
from blueapi.preprocessors.attach_metadata import attach_metadata
from blueapi.service.handler_base import BlueskyHandler
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.reworker import RunEngineWorker
from blueapi.worker.worker import Worker
from blueapi.worker.task import RunPlan
from blueapi.worker.worker import TrackableTask, Worker

LOGGER = logging.getLogger(__name__)


class Handler:
context: BlueskyContext
worker: Worker
config: ApplicationConfig
messaging_template: MessagingTemplate
class Handler(BlueskyHandler):
_context: BlueskyContext
_worker: Worker
_config: ApplicationConfig
_messaging_template: MessagingTemplate

def __init__(
self,
Expand All @@ -32,38 +36,34 @@ def __init__(
messaging_template: Optional[MessagingTemplate] = None,
worker: Optional[Worker] = None,
) -> None:
self.config = config or ApplicationConfig()
self.context = context or BlueskyContext()
self._config = config or ApplicationConfig()
self._context = context or BlueskyContext()

self.context.with_config(self.config.env)
self._context.with_config(self._config.env)

self.worker = worker or RunEngineWorker(
self.context,
broadcast_statuses=self.config.env.events.broadcast_status_events,
self._worker = worker or RunEngineWorker(
self._context,
broadcast_statuses=self._config.env.events.broadcast_status_events,
)
self.messaging_template = (
self._messaging_template = (
messaging_template
or StompMessagingTemplate.autoconfigured(self.config.stomp)
or StompMessagingTemplate.autoconfigured(self._config.stomp)
)

def start(self) -> None:
self.worker.start()
self._worker.start()

event_topic = self._messaging_template.destinations.topic("public.worker.event")

self._publish_event_streams(
{
self.worker.worker_events: self.messaging_template.destinations.topic(
"public.worker.event"
),
self.worker.progress_events: self.messaging_template.destinations.topic(
"public.worker.event"
),
self.worker.data_events: self.messaging_template.destinations.topic(
"public.worker.event"
),
self._worker.worker_events: event_topic,
self._worker.progress_events: event_topic,
self._worker.data_events: event_topic,
}
)

self.messaging_template.connect()
self._messaging_template.connect()

def _publish_event_streams(
self, streams_to_destinations: Mapping[EventStream, str]
Expand All @@ -73,15 +73,66 @@ def _publish_event_streams(

def _publish_event_stream(self, stream: EventStream, destination: str) -> None:
stream.subscribe(
lambda event, correlation_id: self.messaging_template.send(
lambda event, correlation_id: self._messaging_template.send(
destination, event, None, correlation_id
)
)

def stop(self) -> None:
self.worker.stop()
if self.messaging_template.is_connected():
self.messaging_template.disconnect()
self._worker.stop()
if self._messaging_template.is_connected():
self._messaging_template.disconnect()

@property
def plans(self) -> List[PlanModel]:
return [PlanModel.from_plan(plan) for plan in self._context.plans.values()]

def get_plan(self, name: str) -> PlanModel:
return PlanModel.from_plan(self._context.plans[name])

@property
def devices(self) -> List[DeviceModel]:
return [
DeviceModel.from_device(device) for device in self._context.devices.values()
]

def get_device(self, name: str) -> DeviceModel:
return DeviceModel.from_device(self._context.devices[name])

def submit_task(self, task: RunPlan) -> str:
return self._worker.submit_task(task)

def clear_pending_task(self, task_id: str) -> str:
return self._worker.clear_task(task_id)

def begin_task(self, task: WorkerTask) -> WorkerTask:
if task.task_id is not None:
self._worker.begin_task(task.task_id)
return task

@property
def active_task(self) -> Optional[TrackableTask]:
return self._worker.get_active_task()

@property
def state(self) -> WorkerState:
return self._worker.state

def pause_worker(self, defer: Optional[bool]) -> None:
self._worker.pause(defer)

def resume_worker(self) -> None:
self._worker.resume()

def cancel_active_task(self, failure: bool, reason: Optional[str]):
self._worker.cancel_active_task(failure, reason)

@property
def pending_tasks(self) -> List[TrackableTask]:
return self._worker.get_pending_tasks()

def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
return self._worker.get_pending_task(task_id)


HANDLER: Optional[Handler] = None
Expand Down
85 changes: 85 additions & 0 deletions src/blueapi/service/handler_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import ABC, abstractmethod
from typing import List, Optional

from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.task import RunPlan
from blueapi.worker.worker import TrackableTask


class BlueskyHandler(ABC):
"""Interface between web application and underlying Bluesky context and worker"""

@property
@abstractmethod
def plans(self) -> List[PlanModel]:
"""
All available plans in the BlueskyContext
"""

@abstractmethod
def get_plan(self, name: str) -> PlanModel:
"""
Retrieve plan by name from the BlueskyContext
"""

@property
@abstractmethod
def devices(self) -> List[DeviceModel]:
"""
All available devices in the BlueskyContext
"""

@abstractmethod
def get_device(self, name: str) -> DeviceModel:
"""
Retrieve device by name from the BlueskyContext
"""

@abstractmethod
def submit_task(self, task: RunPlan) -> str:
"""
Submit a task to be run on begin_task
"""

@abstractmethod
def clear_pending_task(self, task_id: str) -> str:
"""Remove a pending task from the worker"""

@abstractmethod
def begin_task(self, task: WorkerTask) -> WorkerTask:
"""Trigger a pending task. Will fail if the worker is busy"""

@property
@abstractmethod
def active_task(self) -> Optional[TrackableTask]:
"""Task the worker is currently running"""

@property
@abstractmethod
def state(self) -> WorkerState:
"""State of the worker"""

@abstractmethod
def pause_worker(self, defer: Optional[bool]) -> None:
"""Command the worker to pause"""

@abstractmethod
def resume_worker(self) -> None:
"""Command the worker to resume"""

@abstractmethod
def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None:
"""Remove the currently active task from the worker if there is one
Returns the task_id of the active task"""

@property
@abstractmethod
def pending_tasks(self) -> List[TrackableTask]:
"""Return a list of all tasks pending on the worker,
any one of which can be triggered with begin_task"""

@abstractmethod
def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""
Loading

0 comments on commit c5bf8f9

Please sign in to comment.