Skip to content

Commit

Permalink
Replace references to blueapi messaging package (#589)
Browse files Browse the repository at this point in the history
Replace references to blueapi messaging package with references to the
bluesky stomp library

Fixes #588

---------

Co-authored-by: Zoheb Shaikh <zoheb.shaikh@diamond.ac.uk>
Co-authored-by: DiamondJoseph <53935796+DiamondJoseph@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 5, 2024
1 parent caf7e46 commit 012238d
Show file tree
Hide file tree
Showing 22 changed files with 99 additions and 864 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,7 @@ jobs:
run:
runs-on: ${{ inputs.runs-on }}

services:
activemq:
image: rmohr/activemq:5.14.5-alpine
ports:
- 61613:61613

steps:
- name: Start RabbitMQ
uses: namoshek/rabbitmq-github-action@v1
with:
ports: "61614:61613"
plugins: rabbitmq_stomp

- name: Checkout
uses: actions/checkout@v4
with:
Expand Down
5 changes: 3 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bidict==0.23.1
bluesky==1.13.0a4
bluesky-kafka==0.10.0
bluesky-live==0.0.8
bluesky-stomp==0.1.0
boltons==24.0.0
cachetools==5.5.0
caproto==1.1.1
Expand Down Expand Up @@ -212,10 +213,10 @@ tzlocal==5.2
urllib3==2.2.2
uvicorn==0.30.6
virtualenv==20.26.3
watchfiles==0.23.0
watchfiles==0.24.0
wcwidth==0.2.13
websocket-client==1.8.0
websockets==13.0
websockets==13.0.1
widgetsnbextension==4.0.13
workflows==2.27
xarray==2024.7.0
Expand Down
4 changes: 1 addition & 3 deletions docs/reference/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,10 @@ components:
description: Request to change the state of the worker.
properties:
defer:
anyOf:
- type: boolean
- type: 'null'
default: false
description: Should worker defer Pausing until the next checkpoint
title: Defer
type: boolean
new_state:
$ref: '#/components/schemas/WorkerState'
reason:
Expand Down
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"dls-dodal>=1.24.0",
"super-state-machine", # See GH issue 553
"GitPython",
"bluesky-stomp>=0.1.0"
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down Expand Up @@ -90,9 +91,6 @@ addopts = """
filterwarnings = ["error", "ignore::DeprecationWarning"]
# Doctest python code in docs, python code in src docstrings, test functions in tests
testpaths = "docs src tests"
markers = [
"handler: marks tests that interact with the global handler object in handler.py",
]
asyncio_mode = "auto"

[tool.coverage.run]
Expand Down
14 changes: 10 additions & 4 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import click
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import Broker
from pydantic import ValidationError
from requests.exceptions import ConnectionError

Expand All @@ -16,8 +18,6 @@
from blueapi.client.rest import BlueskyRemoteControlError
from blueapi.config import ApplicationConfig, ConfigLoader
from blueapi.core import DataEvent
from blueapi.messaging import MessageContext
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.main import start
from blueapi.service.openapi import (
DOCS_SCHEMA_LOCATION,
Expand Down Expand Up @@ -147,14 +147,20 @@ def listen_to_events(obj: dict) -> None:
config: ApplicationConfig = obj["config"]
if config.stomp is not None:
event_bus_client = EventBusClient(
StompMessagingTemplate.autoconfigured(config.stomp)
MessagingTemplate.for_broker(
broker=Broker(
host=config.stomp.host,
port=config.stomp.port,
auth=config.stomp.auth,
)
)
)
else:
raise RuntimeError("Message bus needs to be configured")

def on_event(
context: MessageContext,
event: WorkerEvent | ProgressEvent | DataEvent,
context: MessageContext,
) -> None:
converted = json.dumps(event.dict(), indent=2)
print(converted)
Expand Down
14 changes: 11 additions & 3 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
from concurrent.futures import Future

from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import Broker

from blueapi.config import ApplicationConfig
from blueapi.core.bluesky_types import DataEvent
from blueapi.messaging import MessageContext, StompMessagingTemplate
from blueapi.service.model import (
DeviceModel,
DeviceResponse,
Expand Down Expand Up @@ -38,7 +40,13 @@ def __init__(
def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
rest = BlueapiRestClient(config.api)
if config.stomp is not None:
template = StompMessagingTemplate.autoconfigured(config.stomp)
template = MessagingTemplate.for_broker(
broker=Broker(
host=config.stomp.host,
port=config.stomp.port,
auth=config.stomp.auth,
)
)
events = EventBusClient(template)
else:
events = None
Expand Down Expand Up @@ -178,7 +186,7 @@ def run_task(

complete: Future[WorkerEvent] = Future()

def inner_on_event(ctx: MessageContext, event: AnyEvent) -> None:
def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
match event:
case WorkerEvent(task_status=TaskStatus(task_id=test_id)):
relates_to_task = test_id == task_id
Expand Down
8 changes: 5 additions & 3 deletions src/blueapi/client/event_bus.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from collections.abc import Callable

from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import MessageTopic

from blueapi.core import DataEvent
from blueapi.messaging import MessageContext, MessagingTemplate
from blueapi.worker import ProgressEvent, WorkerEvent


Expand All @@ -28,11 +30,11 @@ def __exit__(self, exc_type, exc_value, exc_traceback) -> None:

def subscribe_to_all_events(
self,
on_event: Callable[[MessageContext, AnyEvent], None],
on_event: Callable[[AnyEvent, MessageContext], None],
) -> None:
try:
self.app.subscribe(
self.app.destinations.topic("public.worker.event"),
MessageTopic(name="public.worker.event"),
on_event,
)
except Exception as err:
Expand Down
12 changes: 0 additions & 12 deletions src/blueapi/messaging/__init__.py

This file was deleted.

196 changes: 0 additions & 196 deletions src/blueapi/messaging/base.py

This file was deleted.

12 changes: 0 additions & 12 deletions src/blueapi/messaging/context.py

This file was deleted.

Loading

0 comments on commit 012238d

Please sign in to comment.