Skip to content

Commit

Permalink
Apply the error message cleaning patch onto a fresh branch (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
stan-dot committed May 30, 2024
1 parent 6f08f68 commit dbe9634
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 22 deletions.
27 changes: 19 additions & 8 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from pprint import pprint

import click
from pydantic import ValidationError
from requests.exceptions import ConnectionError

from blueapi import __version__
from blueapi.cli.event_bus_client import EventBusClient
from blueapi.cli.event_bus_client import BlueskyRemoteError, EventBusClient
from blueapi.config import ApplicationConfig, ConfigLoader
from blueapi.core import DataEvent
from blueapi.messaging import MessageContext
Expand Down Expand Up @@ -180,9 +181,8 @@ def run_plan(
if config.stomp is not None:
_message_template = StompMessagingTemplate.autoconfigured(config.stomp)
else:
raise RuntimeError(
"Cannot run plans without Stomp configuration to track progress"
)
pprint("ERROR: Cannot run plans without Stomp configuration to track progress")
return
event_bus_client = EventBusClient(_message_template)
finished_event: deque[WorkerEvent] = deque()

Expand All @@ -191,10 +191,21 @@ def store_finished_event(event: WorkerEvent) -> None:
finished_event.append(event)

parameters = parameters or "{}"
task = Task(name=name, params=json.loads(parameters))

resp = client.create_task(task)
task_id = resp.task_id
task_id = ""
parsed_params = json.loads(parameters) if isinstance(parameters, str) else {}
try:
task = Task(name=name, params=parsed_params)
resp = client.create_task(task)
task_id = resp.task_id
except ValidationError as e:
pprint(f"failed to validate the task parameters, {task_id}, error: {e}")
return
except BlueskyRemoteError as e:
pprint(f"server error with this message: {e}")
return
except ValueError:
pprint("task could not run")
return

with event_bus_client:
event_bus_client.subscribe_to_topics(task_id, on_event=store_finished_event)
Expand Down
16 changes: 15 additions & 1 deletion src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections.abc import Callable, Mapping
from http import HTTPStatus
from typing import Any, Literal, TypeVar

import requests
Expand All @@ -20,6 +21,15 @@
T = TypeVar("T")


def get_status_message(code: int) -> str:
"""Returns the standard description for a given HTTP status code."""
try:
message = HTTPStatus(code).phrase
return message
except ValueError:
return "Unknown Status Code"


def _is_exception(response: requests.Response) -> bool:
return response.status_code >= 400

Expand Down Expand Up @@ -107,7 +117,11 @@ def _request_and_deserialize(
url = self._url(suffix)
response = requests.request(method, url, json=data)
if raise_if(response):
raise BlueskyRemoteError(str(response))
message = get_status_message(response.status_code)
error_message = f"""Response failed with text: {response.text},
with error code: {response.status_code}
which corresponds to {message}"""
raise BlueskyRemoteError(error_message)
deserialized = parse_obj_as(target_type, response.json())
return deserialized

Expand Down
13 changes: 12 additions & 1 deletion src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,23 @@ def submit_task(
):
"""Submit a task to the worker."""
try:
plan_model = handler.get_plan(task.name)
task_id: str = handler.submit_task(task)
response.headers["Location"] = f"{request.url}/{task_id}"
return TaskResponse(task_id=task_id)
except ValidationError as e:
errors = e.errors()
formatted_errors = "; ".join(
[f"{err['loc'][0]}: {err['msg']}" for err in errors]
)
error_detail_response = f"""
Input validation failed: {formatted_errors},
suppplied params {task.params},
do not match the expected params: {plan_model.parameter_schema}
"""
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=e.errors()
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=error_detail_response,
) from e


Expand Down
10 changes: 2 additions & 8 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,8 @@ def test_invalid_stomp_config_for_listener(runner: CliRunner):
def test_cannot_run_plans_without_stomp_config(runner: CliRunner):
result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}'])
assert (
isinstance(result.exception, RuntimeError)
and str(result.exception)
== "Cannot run plans without Stomp configuration to track progress"
"Cannot run plans without Stomp configuration to track progress"
in result.output
)


Expand All @@ -201,8 +200,3 @@ def test_valid_stomp_config_for_listener(runner: CliRunner):
input="\n",
)
assert result.exit_code == 0


def test_invalid_condition_for_run(runner: CliRunner):
result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}'])
assert type(result.exception) is RuntimeError
8 changes: 4 additions & 4 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def begin_task_and_wait_until_complete(
task_id: str,
timeout: float = 5.0,
) -> list[WorkerEvent]:
events: "Future[list[WorkerEvent]]" = take_events(
events: Future[list[WorkerEvent]] = take_events(
worker.worker_events,
lambda event: event.is_complete(),
)
Expand Down Expand Up @@ -355,7 +355,7 @@ def assert_running_count_plan_produces_ordered_worker_and_data_events(
]

count = itertools.count()
events: "Future[list[Any]]" = take_events_from_streams(
events: Future[list[Any]] = take_events_from_streams(
event_streams,
lambda _: next(count) >= len(expected_events) - 1,
)
Expand Down Expand Up @@ -390,7 +390,7 @@ def take_events(
cutoff_predicate: Callable[[E], bool],
) -> "Future[list[E]]":
events: list[E] = []
future: "Future[list[E]]" = Future()
future: Future[list[E]] = Future()

def on_event(event: E, event_id: str | None) -> None:
events.append(event)
Expand Down Expand Up @@ -426,7 +426,7 @@ def take_events_from_streams(
"""
events: list[Any] = []
future: "Future[list[Any]]" = Future()
future: Future[list[Any]] = Future()

def on_event(event: Any, event_id: str | None) -> None:
print(event)
Expand Down

0 comments on commit dbe9634

Please sign in to comment.