diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index fdb910b29..2c738a43a 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -1,5 +1,6 @@ import json import logging +import sys from collections import deque from functools import wraps from pathlib import Path @@ -151,20 +152,23 @@ def listen_to_events(obj: dict) -> None: else: raise RuntimeError("Message bus needs to be configured") + fmt = obj["fmt"] + def on_event( context: MessageContext, event: WorkerEvent | ProgressEvent | DataEvent, ) -> None: - converted = json.dumps(event.dict(), indent=2) - print(converted) + fmt.display(event) print( "Subscribing to all bluesky events from " - f"{config.stomp.host}:{config.stomp.port}" + f"{config.stomp.host}:{config.stomp.port}", + file=sys.stderr, ) with event_bus_client: event_bus_client.subscribe_to_all_events(on_event) - input("Press enter to exit") + print("Press enter to exit", file=sys.stderr) + input() @controller.command(name="run") diff --git a/src/blueapi/cli/format.py b/src/blueapi/cli/format.py index f9815c79c..625ebdd1c 100644 --- a/src/blueapi/cli/format.py +++ b/src/blueapi/cli/format.py @@ -10,13 +10,24 @@ from pydantic import BaseModel +from blueapi.core.bluesky_types import DataEvent from blueapi.service.model import DeviceResponse, PlanResponse +from blueapi.worker.event import ProgressEvent, WorkerEvent FALLBACK = pprint +NL = "\n" Stream = TextIO | None +def fmt_dict(t: dict[str, Any] | Any, ind: int = 1) -> str: + """Format a (possibly nested) dict into a human readable tree""" + if not isinstance(t, dict): + return t + pre = " " * (ind * 4) + return NL + NL.join(f"{pre}{k}: {fmt_dict(v, ind+1)}" for k, v in t.items() if v) + + class OutputFormat(str, enum.Enum): JSON = "json" FULL = "full" @@ -49,6 +60,15 @@ def display_full(obj: Any, stream: Stream): print(dev.name) for proto in dev.protocols: print(" " + proto) + case DataEvent(name=name, doc=doc): + print(f"{name.title()}: {fmt_dict(doc)}") + case WorkerEvent(state=state, task_status=task): + print(f"WorkerEvent: {state}{fmt_dict(task.dict() if task else {})}") + case ProgressEvent(): + print(f"Progress:{fmt_dict(obj.dict())}") + case BaseModel(): + print(obj.__class__.__name__, end='') + print(fmt_dict(obj.dict())) case other: FALLBACK(other, stream=stream) @@ -61,7 +81,7 @@ def display_json(obj: Any, stream: Stream): case DeviceResponse(devices=devices): print(json.dumps([d.dict() for d in devices], indent=2)) case BaseModel(): - print(json.dumps(obj.dict(), indent=2)) + print(json.dumps(obj.dict())) case _: print(json.dumps(obj)) @@ -83,6 +103,13 @@ def display_compact(obj: Any, stream: Stream): for dev in devices: print(dev.name) print(indent(textwrap.fill(", ".join(dev.protocols), 80), " ")) + case DataEvent(name=name): + print(f"Data Event: {name}") + case WorkerEvent(state=state): + print(f"Worker state: {state}") + case ProgressEvent(statuses=stats): + prog = max(100 * (s.percentage or 0) for s in stats.values()) or "???" + print(f"Progress: {prog}%") case other: FALLBACK(other, stream=stream)