diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index ec688774b..2ae11a77c 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -270,7 +270,8 @@ paths: '200': content: application/json: - schema: {} + schema: + $ref: '#/components/schemas/EnvironmentResponse' description: Successful Response summary: Delete Environment get: diff --git a/docs/tutorials/dev-run.md b/docs/tutorials/dev-run.md index 8d60c0dd6..308253a96 100644 --- a/docs/tutorials/dev-run.md +++ b/docs/tutorials/dev-run.md @@ -1,19 +1,18 @@ -# Run/Debug in a Developer Environment +# Run/Debug in a Developer Environment Assuming you have setup a developer environment, you can run a development version of the bluesky worker. - ## Start Bluesky Worker Ensure you are inside your virtual environment: -``` + +``` source venv/bin/activate ``` +You will need to follow the instructions for setting up ActiveMQ as in [run cli instructions](../how-to/run-cli.md). -You will need to follow the instructions for setting up ActiveMQ as in [run cli instructions](../how-to/run-cli.md). - -The worker will be available from the command line (`blueapi serve`), but can be started from vscode with additional +The worker will be available from the command line (`blueapi serve`), but can be started from vscode with additional debugging capabilities. 1. Navigate to "Run and Debug" in the left hand menu. @@ -21,3 +20,9 @@ debugging capabilities. 3. Click the green "Run Button" [debug in vscode](../images/debug-vscode.png) + +## Develop devices + +When you select the 'scratch directory' option - where you have devices (dodal) and plans (BLxx-beamline) in a place like `/dls_sw/BLXX/software/blueapi/scratch`, then the list of devices available will refresh without interfacing with the K8S cluster. Just run the command `blueapi env -r` or `blueapi env --reload`. + +With this setup you get a developer loop: "write devices - write plans - test them with blueapi". diff --git a/pyproject.toml b/pyproject.toml index 7304ec3b6..f2405f3e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ dev = [ "mypy", "pytest-cov", "pytest-asyncio", + "responses", "ruff", "sphinx-autobuild==2024.2.4", # Later versions have a clash with fastapi<0.99, remove pin when fastapi is a higher version "sphinx-copybutton", diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 5a293902a..d2553f500 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -4,6 +4,7 @@ from functools import wraps from pathlib import Path from pprint import pprint +from time import sleep import click from pydantic import ValidationError @@ -181,8 +182,9 @@ def run_plan( if config.stomp is not None: _message_template = StompMessagingTemplate.autoconfigured(config.stomp) else: - pprint("ERROR: Cannot run plans without Stomp configuration to track progress") - return + raise RuntimeError( + "Cannot run plans without Stomp configuration to track progress" + ) event_bus_client = EventBusClient(_message_template) finished_event: deque[WorkerEvent] = deque() @@ -278,6 +280,59 @@ def stop(obj: dict) -> None: pprint(client.cancel_current_task(state=WorkerState.STOPPING)) +@controller.command(name="env") +@check_connection +@click.option( + "-r", + "--reload", + is_flag=True, + type=bool, + help="Reload the current environment", + default=False, +) +@click.pass_obj +def env(obj: dict, reload: bool | None) -> None: + """ + Inspect or restart the environment + """ + + assert isinstance(client := obj["rest_client"], BlueapiRestClient) + if reload: + # Reload the environment if needed + print("Reloading the environment...") + try: + deserialized = client.reload_environment() + print(deserialized) + + except BlueskyRemoteError as e: + raise BlueskyRemoteError("Failed to reload the environment") from e + + # Initialize a variable to keep track of the environment status + environment_initialized = False + polling_count = 0 + max_polling_count = 10 + # Use a while loop to keep checking until the environment is initialized + while not environment_initialized and polling_count < max_polling_count: + # Fetch the current environment status + environment_status = client.get_environment() + + # Check if the environment is initialized + if environment_status.initialized: + print("Environment is initialized.") + environment_initialized = True + else: + print("Waiting for environment to initialize...") + polling_count += 1 + sleep(1) # Wait for 1 seconds before checking again + if polling_count == max_polling_count: + raise TimeoutError("Environment initialization timed out.") + + # Once out of the loop, print the initialized environment status + print(environment_status) + else: + print(client.get_environment()) + + # helper function def process_event_after_finished(event: WorkerEvent, logger: logging.Logger): if event.is_error(): diff --git a/src/blueapi/cli/rest.py b/src/blueapi/cli/rest.py index 420ebd13e..9be4fd4c2 100644 --- a/src/blueapi/cli/rest.py +++ b/src/blueapi/cli/rest.py @@ -9,6 +9,7 @@ from blueapi.service.model import ( DeviceModel, DeviceResponse, + EnvironmentResponse, PlanModel, PlanResponse, TaskResponse, @@ -115,16 +116,23 @@ def _request_and_deserialize( raise_if: Callable[[requests.Response], bool] = _is_exception, ) -> T: url = self._url(suffix) - response = requests.request(method, url, json=data) + if data: + response = requests.request(method, url, json=data) + else: + response = requests.request(method, url) if raise_if(response): - message = get_status_message(response.status_code) - error_message = f"""Response failed with text: {response.text}, - with error code: {response.status_code} - which corresponds to {message}""" - raise BlueskyRemoteError(error_message) + raise BlueskyRemoteError(str(response)) deserialized = parse_obj_as(target_type, response.json()) return deserialized def _url(self, suffix: str) -> str: base_url = f"{self._config.protocol}://{self._config.host}:{self._config.port}" return f"{base_url}{suffix}" + + def get_environment(self) -> EnvironmentResponse: + return self._request_and_deserialize("/environment", EnvironmentResponse) + + def reload_environment(self) -> EnvironmentResponse: + return self._request_and_deserialize( + "/environment", EnvironmentResponse, method="DELETE" + ) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 6c7f3580d..db45c32a5 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -91,17 +91,18 @@ def get_environment( return EnvironmentResponse(initialized=handler.initialized) -@app.delete("/environment") +@app.delete("/environment", response_model=EnvironmentResponse) async def delete_environment( background_tasks: BackgroundTasks, handler: BlueskyHandler = Depends(get_handler), -): +) -> EnvironmentResponse: def restart_handler(handler: BlueskyHandler): handler.stop() handler.start() if handler.initialized: background_tasks.add_task(restart_handler, handler) + return EnvironmentResponse(initialized=False) @app.get("/plans", response_model=PlanResponse) @@ -134,6 +135,9 @@ def get_device_by_name(name: str, handler: BlueskyHandler = Depends(get_handler) return handler.get_device(name) +example_task = Task(name="count", params={"detectors": ["x"]}) + + @app.post( "/tasks", response_model=TaskResponse, @@ -142,7 +146,7 @@ def get_device_by_name(name: str, handler: BlueskyHandler = Depends(get_handler) def submit_task( request: Request, response: Response, - task: Task = Body(..., example=Task(name="count", params={"detectors": ["x"]})), # noqa: B008 + task: Task = Body(..., example=example_task), handler: BlueskyHandler = Depends(get_handler), ): """Submit a task to the worker.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index bbba0fe15..34303c922 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, Mock, patch import pytest +import responses from click.testing import CliRunner from fastapi.testclient import TestClient from pydantic import BaseModel, ValidationError @@ -12,6 +13,7 @@ from blueapi.cli.event_bus_client import BlueskyRemoteError from blueapi.core.bluesky_types import Plan from blueapi.service.handler import Handler, teardown_handler +from blueapi.service.model import EnvironmentResponse @pytest.fixture(autouse=True) @@ -40,14 +42,6 @@ def test_main_no_params(): assert result.stdout == expected -def test_main_with_nonexistent_config_file(): - runner = CliRunner() - result = runner.invoke(main, ["-c", "tests/non_existent.yaml"]) - - assert result.exit_code == 1 - assert type(result.exception) is FileNotFoundError - - @patch("requests.request") def test_connection_error_caught_by_wrapper_func(mock_requests: Mock): mock_requests.side_effect = ConnectionError() @@ -182,9 +176,11 @@ def test_invalid_stomp_config_for_listener(runner: CliRunner): def test_cannot_run_plans_without_stomp_config(runner: CliRunner): result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}']) + assert result.exit_code == 1 assert ( - "Cannot run plans without Stomp configuration to track progress" - in result.output + isinstance(result.exception, RuntimeError) + and str(result.exception) + == "Cannot run plans without Stomp configuration to track progress" ) @@ -203,6 +199,215 @@ def test_valid_stomp_config_for_listener(runner: CliRunner): assert result.exit_code == 0 +@pytest.mark.handler +@patch("blueapi.service.handler.Handler") +@patch("requests.request") +def test_get_env( + mock_requests: Mock, + mock_handler: Mock, + handler: Handler, + client: TestClient, + runner: CliRunner, +): + with patch("uvicorn.run", side_effect=None): + result = runner.invoke(main, ["serve"]) + + assert result.exit_code == 0 + + mock_requests.return_value = Mock() + + runner.invoke(main, ["controller", "env"]) + + assert mock_requests.call_args[0] == ( + "GET", + "http://localhost:8000/environment", + ) + + +@pytest.mark.handler +@patch("blueapi.service.handler.Handler") +@patch("blueapi.cli.rest.BlueapiRestClient.get_environment") +@patch("blueapi.cli.rest.BlueapiRestClient.reload_environment") +@patch("blueapi.cli.cli.sleep", return_value=None) +def test_reset_env_client_behavior( + mock_sleep: MagicMock, + mock_reload_environment: Mock, + mock_get_environment: Mock, + mock_handler: Mock, + handler: Handler, + client: TestClient, + runner: CliRunner, +): + # Configure the mock_requests to simulate different responses + # First two calls return not initialized, followed by an initialized response + mock_get_environment.side_effect = [ + EnvironmentResponse(initialized=False), # not initialized + EnvironmentResponse(initialized=False), # not initialized + EnvironmentResponse(initialized=True), # finally initalized + ] + mock_reload_environment.return_value = "Environment reload initiated." + + with patch("uvicorn.run", side_effect=None): + serve_result = runner.invoke(main, ["serve"]) + + assert serve_result.exit_code == 0 + + # Invoke the CLI command that would trigger the environment initialization check + reload_result = runner.invoke(main, ["controller", "env", "-r"]) + + assert mock_get_environment.call_count == 3 + + # Verify if sleep was called between polling iterations + assert mock_sleep.call_count == 2 # Since the last check doesn't require a sleep + + # Check if the final environment status is printed correctly + # assert "Environment is initialized." in result.output + assert ( + reload_result.output + == "Reloading the environment...\nEnvironment reload initiated.\nWaiting for environment to initialize...\nWaiting for environment to initialize...\nEnvironment is initialized.\ninitialized=True\n" # noqa: E501 + ) + + +@responses.activate +@pytest.mark.handler +@patch("blueapi.service.handler.Handler") +@patch("blueapi.cli.cli.sleep", return_value=None) +def test_env_endpoint_interaction( + mock_sleep: MagicMock, mock_handler: Mock, handler: Handler, runner: CliRunner +): + # Setup mocked responses for the REST endpoints + responses.add( + responses.DELETE, + "http://localhost:8000/environment", + status=200, + json=EnvironmentResponse(initialized=False).dict(), + ) + responses.add( + responses.GET, + "http://localhost:8000/environment", + json=EnvironmentResponse(initialized=False).dict(), + status=200, + ) + responses.add( + responses.GET, + "http://localhost:8000/environment", + json=EnvironmentResponse(initialized=False).dict(), + status=200, + ) + responses.add( + responses.GET, + "http://localhost:8000/environment", + status=200, + json=EnvironmentResponse(initialized=True).dict(), + ) + + # Run the command that should interact with these endpoints + result = runner.invoke(main, ["controller", "env", "-r"]) + + # Check if the endpoints were hit as expected + assert len(responses.calls) == 4 # Ensures that all expected calls were made + + for index, call in enumerate(responses.calls): + if index == 0: + assert call.request.method == "DELETE" + assert call.request.url == "http://localhost:8000/environment" + else: + assert call.request.method == "GET" + assert call.request.url == "http://localhost:8000/environment" + + # Check other assertions as needed, e.g., output or exit codes + assert result.exit_code == 0 + assert "Environment is initialized." in result.output + + +@pytest.mark.handler +@responses.activate +@patch("blueapi.service.handler.Handler") +@patch("blueapi.cli.cli.sleep", return_value=None) +def test_env_timeout( + mock_sleep: MagicMock, mock_handler: Mock, handler: Handler, runner: CliRunner +): + max_polling_count = 10 # Assuming this is your max polling count in the command + + # Setup mocked responses for the REST endpoints + responses.add( + responses.DELETE, + "http://localhost:8000/environment", + status=200, + json=EnvironmentResponse(initialized=False).dict(), + ) + # Add responses for each polling attempt, all indicating not initialized + for _ in range(max_polling_count): + responses.add( + responses.GET, + "http://localhost:8000/environment", + json=EnvironmentResponse(initialized=False).dict(), + status=200, + ) + + # Run the command that should interact with these endpoints + result = runner.invoke(main, ["controller", "env", "-r"]) + if result.exception is not None: + assert isinstance(result.exception, TimeoutError), "Expected a TimeoutError" + assert result.exception.args[0] == "Environment initialization timed out." + else: + raise AssertionError("Expected an exception but got None") + + # Check if the endpoints were hit as expected + assert len(responses.calls) == max_polling_count + 1 # +1 for the DELETE call + + # First call should be DELETE + assert responses.calls[0].request.method == "DELETE" + assert responses.calls[0].request.url == "http://localhost:8000/environment" + + # Remaining calls should all be GET + for call in responses.calls[1:]: # Skip the first DELETE request + assert call.request.method == "GET" + assert call.request.url == "http://localhost:8000/environment" + + # Check the output for the timeout message + assert ( + result.exit_code == 1 + ) # Assuming your command exits successfully even on timeout for simplicity + + +@pytest.mark.handler +@responses.activate +@patch("blueapi.service.handler.Handler") +@patch("blueapi.cli.cli.sleep", return_value=None) +def test_env_reload_server_side_error( + mock_sleep: MagicMock, mock_handler: Mock, handler: Handler, runner: CliRunner +): + # Setup mocked error response from the server + responses.add( + responses.DELETE, + "http://localhost:8000/environment", + status=500, + json={}, + ) + # Run the command that should interact with these endpoints + result = runner.invoke(main, ["controller", "env", "-r"]) + if result.exception is not None: + assert isinstance( + result.exception, BlueskyRemoteError + ), "Expected a BlueskyRemoteError" + assert result.exception.args[0] == "Failed to reload the environment" + else: + raise AssertionError("Expected an exception but got None") + + # Check if the endpoints were hit as expected + assert len(responses.calls) == 1 # +1 for the DELETE call + + # Only call should be DELETE + assert responses.calls[0].request.method == "DELETE" + assert responses.calls[0].request.url == "http://localhost:8000/environment" + + # Check the output for the timeout message + assert ( + result.exit_code == 1 + ) # Assuming your command exits successfully even on timeout for simplicity + + @pytest.fixture def mock_config(): # Mock configuration setup diff --git a/tests/worker/test_reworker.py b/tests/worker/test_reworker.py index a22169880..8e5fc217b 100644 --- a/tests/worker/test_reworker.py +++ b/tests/worker/test_reworker.py @@ -347,9 +347,12 @@ def test_worker_and_data_events_produce_in_order(worker: Worker) -> None: def assert_running_count_plan_produces_ordered_worker_and_data_events( expected_events: list[WorkerEvent | DataEvent], worker: Worker, - task: Task = Task(name="count", params={"detectors": ["image_det"], "num": 1}), # noqa: B008 + task: Task | None = None, timeout: float = 5.0, ) -> None: + default_task = Task(name="count", params={"detectors": ["image_det"], "num": 1}) + task = task or default_task + event_streams: list[EventStream[Any, int]] = [ worker.data_events, worker.worker_events,