From fc89e04de6b346bb0825ff30357b23c90f884a76 Mon Sep 17 00:00:00 2001 From: Angelos Angelopoulos Date: Tue, 8 Oct 2024 12:39:19 -0400 Subject: [PATCH] Improve various parts of the code * Add new tests * Improve BaseDevice * Fix formatting of some files * Update docker compose image versions * Mark slow tests and configure them to run last * Refactor some code --- docker/docker-compose.yml | 6 +- eos/campaigns/campaign_executor.py | 619 +++++++++--------- eos/devices/base_device.py | 31 +- eos/devices/device_actor_references.py | 57 -- eos/devices/device_actor_wrapper_registry.py | 49 ++ eos/experiments/experiment_executor.py | 561 ++++++++-------- .../abstract_sequential_optimizer.py | 8 + .../sequential_bayesian_optimizer.py | 11 +- .../sequential_optimizer_actor.py | 3 + eos/tasks/base_task.py | 4 +- eos/tasks/task_executor.py | 12 +- eos/utils/ray_utils.py | 26 +- pyproject.toml | 3 + tests/conftest.py | 7 + tests/fixtures.py | 5 +- tests/test_base_device.py | 94 +++ tests/test_base_task.py | 109 +++ tests/test_bayesian_sequential_optimizer.py | 9 +- tests/test_campaign_executor.py | 142 ++++ tests/test_device_actor_wrapper_registry.py | 69 ++ tests/test_experiment_executor.py | 49 +- tests/test_ray_actor_wrapper.py | 65 ++ 22 files changed, 1235 insertions(+), 704 deletions(-) delete mode 100644 eos/devices/device_actor_references.py create mode 100644 eos/devices/device_actor_wrapper_registry.py create mode 100644 tests/conftest.py create mode 100644 tests/test_base_device.py create mode 100644 tests/test_base_task.py create mode 100644 tests/test_device_actor_wrapper_registry.py create mode 100644 tests/test_ray_actor_wrapper.py diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 11ad4bd..67e3067 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,6 +1,6 @@ services: eos-mongodb: - image: mongo:jammy + image: mongo:noble container_name: eos-mongodb restart: unless-stopped environment: @@ -14,7 +14,7 @@ services: - mongodb_data:/data/db eos-minio: - image: minio/minio:latest + image: minio/minio:RELEASE.2024-10-02T17-50-41Z container_name: eos-minio restart: unless-stopped environment: @@ -30,7 +30,7 @@ services: command: server --console-address ":9001" /data eos-budibase: - image: budibase/budibase:latest + image: budibase/budibase:2.32.12-sqs container_name: eos-budibase restart: unless-stopped ports: diff --git a/eos/campaigns/campaign_executor.py b/eos/campaigns/campaign_executor.py index 34b460a..27703e9 100644 --- a/eos/campaigns/campaign_executor.py +++ b/eos/campaigns/campaign_executor.py @@ -2,6 +2,7 @@ from typing import Any, TYPE_CHECKING import pandas as pd +from ray.actor import ActorHandle from eos.campaigns.campaign_manager import CampaignManager from eos.campaigns.campaign_optimizer_manager import CampaignOptimizerManager @@ -11,322 +12,322 @@ from eos.experiments.exceptions import EosExperimentCancellationError, EosExperimentExecutionError from eos.experiments.experiment_executor_factory import ExperimentExecutorFactory from eos.logging.logger import log -from eos.optimization.abstract_sequential_optimizer import AbstractSequentialOptimizer from eos.tasks.task_manager import TaskManager from eos.utils import dict_utils if TYPE_CHECKING: - from eos.experiments.experiment_executor import ExperimentExecutor + from eos.experiments.experiment_executor import ExperimentExecutor class CampaignExecutor: - def __init__( - self, - campaign_id: str, - experiment_type: str, - execution_parameters: CampaignExecutionParameters, - campaign_manager: CampaignManager, - campaign_optimizer_manager: CampaignOptimizerManager, - task_manager: TaskManager, - experiment_executor_factory: ExperimentExecutorFactory, - ): - self._campaign_id = campaign_id - self._experiment_type = experiment_type - self._execution_parameters = execution_parameters - self._campaign_manager = campaign_manager - self._campaign_optimizer_manager = campaign_optimizer_manager - self._task_manager = task_manager - self._experiment_executor_factory = experiment_executor_factory - - self._optimizer = None - self._optimizer_input_names: list[str] = [] - self._optimizer_output_names: list[str] = [] - - self._experiment_executors: dict[str, ExperimentExecutor] = {} - - self._campaign_status: CampaignStatus | None = None - - def _setup_optimizer(self) -> None: - if self._optimizer: - return - - self._optimizer = self._campaign_optimizer_manager.create_campaign_optimizer_actor( - self._experiment_type, - self._campaign_id, - self._execution_parameters.optimizer_computer_ip, - ) - self._optimizer_input_names, self._optimizer_output_names = ( - self._campaign_optimizer_manager.get_input_and_output_names(self._campaign_id) - ) - - def cleanup(self) -> None: - """ - Clean up resources when the campaign executor is no longer needed. - """ - if self._execution_parameters.do_optimization: - self._campaign_optimizer_manager.terminate_campaign_optimizer_actor(self._campaign_id) - - async def start_campaign(self) -> None: - """ - Start the campaign or handle an existing campaign. - """ - campaign = self._campaign_manager.get_campaign(self._campaign_id) - if campaign: - await self._handle_existing_campaign(campaign) - else: - self._create_new_campaign() - - self._campaign_manager.start_campaign(self._campaign_id) - self._campaign_status = CampaignStatus.RUNNING - log.info(f"Started campaign '{self._campaign_id}'.") - - async def _handle_existing_campaign(self, campaign: Campaign) -> None: - """ - Handle cases when the campaign already exists. - """ - self._campaign_status = campaign.status - - if not self._execution_parameters.resume: - def _raise_error(status: str) -> None: - raise EosCampaignExecutionError( - f"Cannot start campaign '{self._campaign_id}' as it already exists and is '{status}'. " - f"Please create a new campaign or re-submit with 'resume=True'." + def __init__( + self, + campaign_id: str, + experiment_type: str, + execution_parameters: CampaignExecutionParameters, + campaign_manager: CampaignManager, + campaign_optimizer_manager: CampaignOptimizerManager, + task_manager: TaskManager, + experiment_executor_factory: ExperimentExecutorFactory, + ): + self._campaign_id = campaign_id + self._experiment_type = experiment_type + self._execution_parameters = execution_parameters + self._campaign_manager = campaign_manager + self._campaign_optimizer_manager = campaign_optimizer_manager + self._task_manager = task_manager + self._experiment_executor_factory = experiment_executor_factory + + self._optimizer: ActorHandle | None = None + self._optimizer_input_names: list[str] = [] + self._optimizer_output_names: list[str] = [] + + self._experiment_executors: dict[str, ExperimentExecutor] = {} + + self._campaign_status: CampaignStatus | None = None + + def _setup_optimizer(self) -> None: + if self._optimizer: + return + + self._optimizer = self._campaign_optimizer_manager.create_campaign_optimizer_actor( + self._experiment_type, + self._campaign_id, + self._execution_parameters.optimizer_computer_ip, + ) + self._optimizer_input_names, self._optimizer_output_names = ( + self._campaign_optimizer_manager.get_input_and_output_names(self._campaign_id) ) - status_handlers = { - CampaignStatus.COMPLETED: lambda: _raise_error("completed"), - CampaignStatus.SUSPENDED: lambda: _raise_error("suspended"), - CampaignStatus.CANCELLED: lambda: _raise_error("cancelled"), - CampaignStatus.FAILED: lambda: _raise_error("failed"), - } - status_handlers.get(self._campaign_status, lambda: None)() - - await self._resume_campaign() - - def _create_new_campaign(self) -> None: - """ - Create a new campaign. - """ - self._campaign_manager.create_campaign( - campaign_id=self._campaign_id, - experiment_type=self._experiment_type, - execution_parameters=self._execution_parameters, - ) - - if self._execution_parameters.do_optimization: - self._setup_optimizer() - - async def _resume_campaign(self) -> None: - """ - Resume an existing campaign. - """ - self._campaign_manager.delete_current_campaign_experiments(self._campaign_id) - - if self._execution_parameters.do_optimization: - self._setup_optimizer() - await self._restore_optimizer_state() - - log.info(f"Campaign '{self._campaign_id}' resumed.") - - async def _restore_optimizer_state(self) -> None: - """ - Restore the optimizer state for a resumed campaign. - """ - completed_experiment_ids = self._campaign_manager.get_campaign_experiment_ids( - self._campaign_id, status=ExperimentStatus.COMPLETED - ) - - inputs_df, outputs_df = await self._collect_experiment_results(completed_experiment_ids) - - await self._optimizer.report.remote(inputs_df, outputs_df) - - log.info( - f"CMP '{self._campaign_id}' - Restored optimizer state with {len(completed_experiment_ids)} " - f"completed experiments." - ) - - async def cancel_campaign(self) -> None: - """ - Cancel the campaign and all running experiments. - """ - campaign = self._campaign_manager.get_campaign(self._campaign_id) - if not campaign or campaign.status != CampaignStatus.RUNNING: - raise EosCampaignExecutionError( - f"Cannot cancel campaign '{self._campaign_id}' with status " - f"'{campaign.status if campaign else 'None'}'. It must be running." - ) - - log.warning(f"Cancelling campaign '{self._campaign_id}'...") - self._campaign_manager.cancel_campaign(self._campaign_id) - self._campaign_status = CampaignStatus.CANCELLED - - await self._cancel_running_experiments() - - log.warning(f"Cancelled campaign '{self._campaign_id}'.") - - async def _cancel_running_experiments(self) -> None: - """ - Cancel all running experiments in the campaign. - """ - cancellation_tasks = [executor.cancel_experiment() for executor in self._experiment_executors.values()] - try: - await asyncio.wait_for(asyncio.gather(*cancellation_tasks, return_exceptions=True), timeout=30) - except asyncio.TimeoutError as e: - raise EosCampaignExecutionError( - f"CMP '{self._campaign_id}' - Timed out while cancelling experiments. " - f"Some experiments may still be running." - ) from e - except EosExperimentCancellationError as e: - raise EosCampaignExecutionError( - f"CMP '{self._campaign_id}' - Error cancelling experiments. Some experiments may still " - f"be running." - ) from e - - async def progress_campaign(self) -> bool: - """ - Progress the campaign by executing experiments. - Returns True if the campaign is completed, False otherwise. - """ - try: - if self._campaign_status != CampaignStatus.RUNNING: - return self._campaign_status == CampaignStatus.CANCELLED - - await self._progress_experiments() - - campaign = self._campaign_manager.get_campaign(self._campaign_id) - if self._is_campaign_completed(campaign): + def cleanup(self) -> None: + """ + Clean up resources when the campaign executor is no longer needed. + """ if self._execution_parameters.do_optimization: - await self._compute_pareto_solutions() - self._campaign_manager.complete_campaign(self._campaign_id) - return True - - await self._create_experiments(campaign) - - return False - except EosExperimentExecutionError as e: - self._campaign_manager.fail_campaign(self._campaign_id) - self._campaign_status = CampaignStatus.FAILED - raise EosCampaignExecutionError(f"Error executing campaign '{self._campaign_id}'") from e - - async def _progress_experiments(self) -> None: - """ - Progress all running experiments sequentially and process completed ones. - """ - completed_experiments = [] - - for experiment_id, executor in self._experiment_executors.items(): - is_completed = await executor.progress_experiment() - if is_completed: - completed_experiments.append(experiment_id) - - if self._execution_parameters.do_optimization and completed_experiments: - await self._process_completed_experiments(completed_experiments) - - for experiment_id in completed_experiments: - del self._experiment_executors[experiment_id] - self._campaign_manager.delete_campaign_experiment(self._campaign_id, experiment_id) - self._campaign_manager.increment_iteration(self._campaign_id) - - async def _process_completed_experiments(self, completed_experiments: list[str]) -> None: - """ - Process the results of completed experiments. - """ - inputs_df, outputs_df = await self._collect_experiment_results(completed_experiments) - await self._optimizer.report.remote(inputs_df, outputs_df) - self._campaign_optimizer_manager.record_campaign_samples( - self._campaign_id, completed_experiments, inputs_df, outputs_df - ) - - async def _collect_experiment_results(self, experiment_ids: list[str]) -> tuple[pd.DataFrame, pd.DataFrame]: - """ - Collect the results of completed experiments. - """ - inputs = {input_name: [] for input_name in self._optimizer_input_names} - outputs = {output_name: [] for output_name in self._optimizer_output_names} - - for experiment_id in experiment_ids: - for input_name in self._optimizer_input_names: - reference_task_id, parameter_name = input_name.split(".") - task = self._task_manager.get_task(experiment_id, reference_task_id) - inputs[input_name].append(float(task.input.parameters[parameter_name])) - for output_name in self._optimizer_output_names: - reference_task_id, parameter_name = output_name.split(".") - output_parameters = self._task_manager.get_task_output(experiment_id, reference_task_id).parameters - outputs[output_name].append(float(output_parameters[parameter_name])) - - return pd.DataFrame(inputs), pd.DataFrame(outputs) - - async def _create_experiments(self, campaign: Campaign) -> None: - """ - Create new experiments if possible. - """ - while self._can_create_more_experiments(campaign): - iteration = campaign.experiments_completed + len(self._experiment_executors) - new_experiment_id = f"{self._campaign_id}_exp_{iteration + 1}" - - experiment_dynamic_parameters = await self._get_experiment_parameters(iteration) - - experiment_execution_parameters = ExperimentExecutionParameters() - experiment_executor = self._experiment_executor_factory.create( - new_experiment_id, self._experiment_type, experiment_execution_parameters - ) - self._campaign_manager.add_campaign_experiment(self._campaign_id, new_experiment_id) - self._experiment_executors[new_experiment_id] = experiment_executor - experiment_executor.start_experiment(experiment_dynamic_parameters) - - async def _get_experiment_parameters(self, iteration: int) -> dict[str, Any]: - """ - Get parameters for a new experiment. - """ - campaign_dynamic_parameters = self._execution_parameters.dynamic_parameters - - if campaign_dynamic_parameters and len(campaign_dynamic_parameters) > iteration: - return campaign_dynamic_parameters[iteration] - if self._execution_parameters.do_optimization: - log.info(f"CMP '{self._campaign_id}' - Sampling new parameters from the optimizer...") - new_parameters = await self._optimizer.sample.remote(1) - new_parameters = new_parameters.to_dict(orient="records")[0] - log.debug(f"CMP '{self._campaign_id}' - Sampled parameters: {new_parameters}") - return dict_utils.unflatten_dict(new_parameters) - - raise EosCampaignExecutionError( - f"CMP '{self._campaign_id}' - No dynamic parameters provided for iteration {iteration}." - ) - - def _can_create_more_experiments(self, campaign: Campaign) -> bool: - """ - Check if more experiments can be created. - """ - num_executors = len(self._experiment_executors) - max_concurrent = self._execution_parameters.max_concurrent_experiments - max_total = self._execution_parameters.max_experiments - current_total = campaign.experiments_completed + num_executors - - return num_executors < max_concurrent and (max_total == 0 or current_total < max_total) - - def _is_campaign_completed(self, campaign: Campaign) -> bool: - """ - Check if the campaign is completed. - """ - max_experiments = self._execution_parameters.max_experiments - return ( - max_experiments > 0 - and campaign.experiments_completed >= max_experiments - and len(self._experiment_executors) == 0 - ) - - async def _compute_pareto_solutions(self) -> None: - """ - Compute and store Pareto solutions for the campaign. - """ - log.info(f"Computing Pareto solutions for campaign '{self._campaign_id}'...") - try: - pareto_solutions_df = await self._optimizer.get_optimal_solutions.remote() - pareto_solutions = pareto_solutions_df.to_dict(orient="records") - self._campaign_manager.set_pareto_solutions(self._campaign_id, pareto_solutions) - except Exception as e: - raise EosCampaignExecutionError(f"CMP '{self._campaign_id}' - Error computing Pareto solutions.") from e - - @property - def optimizer(self) -> AbstractSequentialOptimizer: - return self._optimizer + self._campaign_optimizer_manager.terminate_campaign_optimizer_actor(self._campaign_id) + + async def start_campaign(self) -> None: + """ + Start the campaign or handle an existing campaign. + """ + campaign = self._campaign_manager.get_campaign(self._campaign_id) + if campaign: + await self._handle_existing_campaign(campaign) + else: + self._create_new_campaign() + + self._campaign_manager.start_campaign(self._campaign_id) + self._campaign_status = CampaignStatus.RUNNING + log.info(f"Started campaign '{self._campaign_id}'.") + + async def _handle_existing_campaign(self, campaign: Campaign) -> None: + """ + Handle cases when the campaign already exists. + """ + self._campaign_status = campaign.status + + if not self._execution_parameters.resume: + + def _raise_error(status: str) -> None: + raise EosCampaignExecutionError( + f"Cannot start campaign '{self._campaign_id}' as it already exists and is '{status}'. " + f"Please create a new campaign or re-submit with 'resume=True'." + ) + + status_handlers = { + CampaignStatus.COMPLETED: lambda: _raise_error("completed"), + CampaignStatus.SUSPENDED: lambda: _raise_error("suspended"), + CampaignStatus.CANCELLED: lambda: _raise_error("cancelled"), + CampaignStatus.FAILED: lambda: _raise_error("failed"), + } + status_handlers.get(self._campaign_status, lambda: None)() + + await self._resume_campaign() + + def _create_new_campaign(self) -> None: + """ + Create a new campaign. + """ + self._campaign_manager.create_campaign( + campaign_id=self._campaign_id, + experiment_type=self._experiment_type, + execution_parameters=self._execution_parameters, + ) + + if self._execution_parameters.do_optimization: + self._setup_optimizer() + + async def _resume_campaign(self) -> None: + """ + Resume an existing campaign. + """ + self._campaign_manager.delete_current_campaign_experiments(self._campaign_id) + + if self._execution_parameters.do_optimization: + self._setup_optimizer() + await self._restore_optimizer_state() + + log.info(f"Campaign '{self._campaign_id}' resumed.") + + async def _restore_optimizer_state(self) -> None: + """ + Restore the optimizer state for a resumed campaign. + """ + completed_experiment_ids = self._campaign_manager.get_campaign_experiment_ids( + self._campaign_id, status=ExperimentStatus.COMPLETED + ) + + inputs_df, outputs_df = await self._collect_experiment_results(completed_experiment_ids) + + await self._optimizer.report.remote(inputs_df, outputs_df) + + log.info( + f"CMP '{self._campaign_id}' - Restored optimizer state with {len(completed_experiment_ids)} " + f"completed experiments." + ) + + async def cancel_campaign(self) -> None: + """ + Cancel the campaign and all running experiments. + """ + campaign = self._campaign_manager.get_campaign(self._campaign_id) + if not campaign or campaign.status != CampaignStatus.RUNNING: + raise EosCampaignExecutionError( + f"Cannot cancel campaign '{self._campaign_id}' with status " + f"'{campaign.status if campaign else 'None'}'. It must be running." + ) + + log.warning(f"Cancelling campaign '{self._campaign_id}'...") + self._campaign_manager.cancel_campaign(self._campaign_id) + self._campaign_status = CampaignStatus.CANCELLED + + await self._cancel_running_experiments() + self._experiment_executors.clear() + + log.warning(f"Cancelled campaign '{self._campaign_id}'.") + + async def _cancel_running_experiments(self) -> None: + """ + Cancel all running experiments in the campaign. + """ + cancellation_tasks = [executor.cancel_experiment() for executor in self._experiment_executors.values()] + try: + await asyncio.wait_for(asyncio.gather(*cancellation_tasks, return_exceptions=True), timeout=15) + except asyncio.TimeoutError as e: + raise EosCampaignExecutionError( + f"CMP '{self._campaign_id}' - Timed out while cancelling experiments. " + f"Some experiments may still be running." + ) from e + except EosExperimentCancellationError as e: + raise EosCampaignExecutionError( + f"CMP '{self._campaign_id}' - Error cancelling experiments. Some experiments may still be running." + ) from e + + async def progress_campaign(self) -> bool: + """ + Progress the campaign by executing experiments. + Returns True if the campaign is completed, False otherwise. + """ + try: + if self._campaign_status != CampaignStatus.RUNNING: + return self._campaign_status == CampaignStatus.CANCELLED + + await self._progress_experiments() + + campaign = self._campaign_manager.get_campaign(self._campaign_id) + if self._is_campaign_completed(campaign): + if self._execution_parameters.do_optimization: + await self._compute_pareto_solutions() + self._campaign_manager.complete_campaign(self._campaign_id) + return True + + await self._create_experiments(campaign) + + return False + except EosExperimentExecutionError as e: + self._campaign_manager.fail_campaign(self._campaign_id) + self._campaign_status = CampaignStatus.FAILED + raise EosCampaignExecutionError(f"Error executing campaign '{self._campaign_id}'") from e + + async def _progress_experiments(self) -> None: + """ + Progress all running experiments sequentially and process completed ones. + """ + completed_experiments = [] + + for experiment_id, executor in self._experiment_executors.items(): + is_completed = await executor.progress_experiment() + if is_completed: + completed_experiments.append(experiment_id) + + if self._execution_parameters.do_optimization and completed_experiments: + await self._process_completed_experiments(completed_experiments) + + for experiment_id in completed_experiments: + del self._experiment_executors[experiment_id] + self._campaign_manager.delete_campaign_experiment(self._campaign_id, experiment_id) + self._campaign_manager.increment_iteration(self._campaign_id) + + async def _process_completed_experiments(self, completed_experiments: list[str]) -> None: + """ + Process the results of completed experiments. + """ + inputs_df, outputs_df = await self._collect_experiment_results(completed_experiments) + await self._optimizer.report.remote(inputs_df, outputs_df) + self._campaign_optimizer_manager.record_campaign_samples( + self._campaign_id, completed_experiments, inputs_df, outputs_df + ) + + async def _collect_experiment_results(self, experiment_ids: list[str]) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Collect the results of completed experiments. + """ + inputs = {input_name: [] for input_name in self._optimizer_input_names} + outputs = {output_name: [] for output_name in self._optimizer_output_names} + + for experiment_id in experiment_ids: + for input_name in self._optimizer_input_names: + reference_task_id, parameter_name = input_name.split(".") + task = self._task_manager.get_task(experiment_id, reference_task_id) + inputs[input_name].append(float(task.input.parameters[parameter_name])) + for output_name in self._optimizer_output_names: + reference_task_id, parameter_name = output_name.split(".") + output_parameters = self._task_manager.get_task_output(experiment_id, reference_task_id).parameters + outputs[output_name].append(float(output_parameters[parameter_name])) + + return pd.DataFrame(inputs), pd.DataFrame(outputs) + + async def _create_experiments(self, campaign: Campaign) -> None: + """ + Create new experiments if possible. + """ + while self._can_create_more_experiments(campaign): + iteration = campaign.experiments_completed + len(self._experiment_executors) + new_experiment_id = f"{self._campaign_id}_exp_{iteration + 1}" + + experiment_dynamic_parameters = await self._get_experiment_parameters(iteration) + + experiment_execution_parameters = ExperimentExecutionParameters() + experiment_executor = self._experiment_executor_factory.create( + new_experiment_id, self._experiment_type, experiment_execution_parameters + ) + self._campaign_manager.add_campaign_experiment(self._campaign_id, new_experiment_id) + self._experiment_executors[new_experiment_id] = experiment_executor + experiment_executor.start_experiment(experiment_dynamic_parameters) + + async def _get_experiment_parameters(self, iteration: int) -> dict[str, Any]: + """ + Get parameters for a new experiment. + """ + campaign_dynamic_parameters = self._execution_parameters.dynamic_parameters + + if campaign_dynamic_parameters and len(campaign_dynamic_parameters) > iteration: + return campaign_dynamic_parameters[iteration] + if self._execution_parameters.do_optimization: + log.info(f"CMP '{self._campaign_id}' - Sampling new parameters from the optimizer...") + new_parameters = await self._optimizer.sample.remote(1) + new_parameters = new_parameters.to_dict(orient="records")[0] + log.debug(f"CMP '{self._campaign_id}' - Sampled parameters: {new_parameters}") + return dict_utils.unflatten_dict(new_parameters) + + raise EosCampaignExecutionError( + f"CMP '{self._campaign_id}' - No dynamic parameters provided for iteration {iteration}." + ) + + def _can_create_more_experiments(self, campaign: Campaign) -> bool: + """ + Check if more experiments can be created. + """ + num_executors = len(self._experiment_executors) + max_concurrent = self._execution_parameters.max_concurrent_experiments + max_total = self._execution_parameters.max_experiments + current_total = campaign.experiments_completed + num_executors + + return num_executors < max_concurrent and (max_total == 0 or current_total < max_total) + + def _is_campaign_completed(self, campaign: Campaign) -> bool: + """ + Check if the campaign is completed. + """ + max_experiments = self._execution_parameters.max_experiments + return ( + max_experiments > 0 + and campaign.experiments_completed >= max_experiments + and len(self._experiment_executors) == 0 + ) + + async def _compute_pareto_solutions(self) -> None: + """ + Compute and store Pareto solutions for the campaign. + """ + log.info(f"Computing Pareto solutions for campaign '{self._campaign_id}'...") + try: + pareto_solutions_df = await self._optimizer.get_optimal_solutions.remote() + pareto_solutions = pareto_solutions_df.to_dict(orient="records") + self._campaign_manager.set_pareto_solutions(self._campaign_id, pareto_solutions) + except Exception as e: + raise EosCampaignExecutionError(f"CMP '{self._campaign_id}' - Error computing Pareto solutions.") from e + + @property + def optimizer(self) -> ActorHandle | None: + return self._optimizer diff --git a/eos/devices/base_device.py b/eos/devices/base_device.py index ce9f48f..ce39e3d 100644 --- a/eos/devices/base_device.py +++ b/eos/devices/base_device.py @@ -1,5 +1,6 @@ import threading from abc import ABC, abstractmethod, ABCMeta +from enum import Enum from typing import Any from eos.devices.exceptions import ( @@ -9,7 +10,7 @@ ) -class DeviceStatus: +class DeviceStatus(Enum): DISABLED = "DISABLED" IDLE = "IDLE" BUSY = "BUSY" @@ -20,9 +21,15 @@ def capture_exceptions(func: callable) -> callable: def wrapper(self, *args, **kwargs) -> Any: try: return func(self, *args, **kwargs) + + except ( + EosDeviceInitializationError, + EosDeviceCleanupError, + ) as e: + raise e except Exception as e: self._status = DeviceStatus.ERROR - raise EosDeviceError(f"Error in {func.__name__} in device {self._device_id}") from e + raise EosDeviceError(f"Error in the function '{func.__name__}' in device '{self._device_id}'.") from e return wrapper @@ -70,6 +77,7 @@ def __del__(self): if "_status" not in self.__dict__: return if self._status and self._status != DeviceStatus.DISABLED: + self._status = DeviceStatus.DISABLED self.cleanup() def initialize(self, initialization_parameters: dict[str, Any]) -> None: @@ -112,17 +120,15 @@ def enable(self) -> None: """ Enable the device. The status should be IDLE after calling this method. """ - with self._lock: - if self._status == DeviceStatus.DISABLED: - self.initialize(self._initialization_parameters) + if self._status == DeviceStatus.DISABLED: + self.initialize(self._initialization_parameters) def disable(self) -> None: """ Disable the device. The status should be DISABLED after calling this method. """ - with self._lock: - if self._status != DeviceStatus.DISABLED: - self.cleanup() + if self._status != DeviceStatus.DISABLED: + self.cleanup() def report(self) -> dict[str, Any]: """ @@ -139,13 +145,16 @@ def report_status(self) -> dict[str, Any]: "status": self._status, } - def get_id(self) -> str: + @property + def id(self) -> str: return self._device_id - def get_type(self) -> str: + @property + def type(self) -> str: return self._device_type - def get_status(self) -> str: + @property + def status(self) -> DeviceStatus: return self._status @abstractmethod diff --git a/eos/devices/device_actor_references.py b/eos/devices/device_actor_references.py deleted file mode 100644 index 67156fa..0000000 --- a/eos/devices/device_actor_references.py +++ /dev/null @@ -1,57 +0,0 @@ -from dataclasses import dataclass - -from ray.actor import ActorHandle - -from eos.utils.ray_utils import RayActorWrapper - - -@dataclass(frozen=True) -class DeviceRayActorReference: - id: str - lab_id: str - type: str - actor_handle: ActorHandle - - -@dataclass(frozen=True) -class DeviceRayActorWrapperReference: - id: str - lab_id: str - type: str - ray_actor_wrapper: RayActorWrapper - - -class DeviceRayActorWrapperReferences: - def __init__(self, devices: list[DeviceRayActorReference]): - self._devices_by_lab_and_id: dict[tuple[str, str], DeviceRayActorWrapperReference] = {} - self._devices_by_lab_id: dict[str, list[DeviceRayActorWrapperReference]] = {} - self._devices_by_type: dict[str, list[DeviceRayActorWrapperReference]] = {} - - for device in devices: - device_actor_wrapper_reference = DeviceRayActorWrapperReference( - id=device.id, - lab_id=device.lab_id, - type=device.type, - ray_actor_wrapper=RayActorWrapper(device.actor_handle), - ) - self._devices_by_lab_and_id[(device.lab_id, device.id)] = device_actor_wrapper_reference - - if device.lab_id not in self._devices_by_lab_id: - self._devices_by_lab_id[device.lab_id] = [] - self._devices_by_lab_id[device.lab_id].append(device_actor_wrapper_reference) - - if device.type not in self._devices_by_type: - self._devices_by_type[device.type] = [] - self._devices_by_type[device.type].append(device_actor_wrapper_reference) - - def get(self, lab_id: str, device_id: str) -> RayActorWrapper | None: - device = self._devices_by_lab_and_id.get((lab_id, device_id)) - return device.ray_actor_wrapper if device else None - - def get_all_by_lab_id(self, lab_id: str) -> list[RayActorWrapper]: - devices = self._devices_by_lab_id.get(lab_id, []) - return [device.ray_actor_wrapper for device in devices] - - def get_all_by_type(self, device_type: str) -> list[RayActorWrapper]: - devices = self._devices_by_type.get(device_type, []) - return [device.ray_actor_wrapper for device in devices] diff --git a/eos/devices/device_actor_wrapper_registry.py b/eos/devices/device_actor_wrapper_registry.py new file mode 100644 index 0000000..bcb3086 --- /dev/null +++ b/eos/devices/device_actor_wrapper_registry.py @@ -0,0 +1,49 @@ +from dataclasses import dataclass + +from ray.actor import ActorHandle + +from eos.utils.ray_utils import RayActorWrapper + + +@dataclass(frozen=True) +class DeviceActorReference: + id: str + lab_id: str + type: str + actor_handle: ActorHandle + + +@dataclass(frozen=True) +class DeviceActorWrapperReference: + id: str + lab_id: str + type: str + actor_wrapper: RayActorWrapper + + +class DeviceActorWrapperRegistry: + def __init__(self, devices: list[DeviceActorReference]): + self._devices_by_lab_and_id: dict[tuple[str, str], DeviceActorWrapperReference] = {} + self._devices_by_lab_id: dict[str, list[DeviceActorWrapperReference]] = {} + self._devices_by_type: dict[str, list[DeviceActorWrapperReference]] = {} + + for device in devices: + device_wrapper_reference = DeviceActorWrapperReference( + id=device.id, + lab_id=device.lab_id, + type=device.type, + actor_wrapper=RayActorWrapper(device.actor_handle), + ) + self._devices_by_lab_and_id[(device.lab_id, device.id)] = device_wrapper_reference + self._devices_by_lab_id.setdefault(device.lab_id, []).append(device_wrapper_reference) + self._devices_by_type.setdefault(device.type, []).append(device_wrapper_reference) + + def get(self, lab_id: str, device_id: str) -> RayActorWrapper | None: + device = self._devices_by_lab_and_id.get((lab_id, device_id)) + return device.actor_wrapper if device else None + + def get_all_by_lab_id(self, lab_id: str) -> list[RayActorWrapper]: + return [device.actor_wrapper for device in self._devices_by_lab_id.get(lab_id, [])] + + def get_all_by_type(self, device_type: str) -> list[RayActorWrapper]: + return [device.actor_wrapper for device in self._devices_by_type.get(device_type, [])] diff --git a/eos/experiments/experiment_executor.py b/eos/experiments/experiment_executor.py index 2c33050..3c32549 100644 --- a/eos/experiments/experiment_executor.py +++ b/eos/experiments/experiment_executor.py @@ -6,9 +6,9 @@ from eos.containers.container_manager import ContainerManager from eos.experiments.entities.experiment import ExperimentStatus, ExperimentExecutionParameters, Experiment from eos.experiments.exceptions import ( - EosExperimentExecutionError, - EosExperimentTaskExecutionError, - EosExperimentCancellationError, + EosExperimentExecutionError, + EosExperimentTaskExecutionError, + EosExperimentCancellationError, ) from eos.experiments.experiment_manager import ExperimentManager from eos.logging.logger import log @@ -23,282 +23,283 @@ class ExperimentExecutor: - """Responsible for executing all the tasks of a single experiment.""" - - def __init__( - self, - experiment_id: str, - experiment_type: str, - execution_parameters: ExperimentExecutionParameters, - experiment_graph: ExperimentGraph, - experiment_manager: ExperimentManager, - task_manager: TaskManager, - container_manager: ContainerManager, - task_executor: TaskExecutor, - scheduler: AbstractScheduler, - ): - self._experiment_id = experiment_id - self._experiment_type = experiment_type - self._execution_parameters = execution_parameters - self._experiment_graph = experiment_graph - self._experiment_manager = experiment_manager - self._task_manager = task_manager - self._container_manager = container_manager - self._task_executor = task_executor - self._scheduler = scheduler - self._task_input_resolver = TaskInputResolver(task_manager, experiment_manager) - - self._current_task_execution_parameters: dict[str, TaskExecutionParameters] = {} - self._task_output_futures: dict[str, asyncio.Task] = {} - self._experiment_status = None - - def start_experiment( - self, - dynamic_parameters: dict[str, dict[str, Any]] | None = None, - metadata: dict[str, Any] | None = None, - ) -> None: - """ - Start the experiment and register the executor with the scheduler. - """ - experiment = self._experiment_manager.get_experiment(self._experiment_id) - if experiment: - self._handle_existing_experiment(experiment) - else: - self._create_new_experiment(dynamic_parameters, metadata) - - self._scheduler.register_experiment( - experiment_id=self._experiment_id, - experiment_type=self._experiment_type, - experiment_graph=self._experiment_graph, - ) - - self._experiment_manager.start_experiment(self._experiment_id) - self._experiment_status = ExperimentStatus.RUNNING - - log.info(f"{'Resumed' if self._execution_parameters.resume else 'Started'} experiment '{self._experiment_id}'.") - - def _handle_existing_experiment(self, experiment: Experiment) -> None: - """ - Handle cases when the experiment already exists. - """ - self._experiment_status = experiment.status - - if not self._execution_parameters.resume: - def _raise_error(status: str) -> None: - raise EosExperimentExecutionError( - f"Cannot start experiment '{self._experiment_id}' as it already exists and is '{status}'. " - f"Please create a new experiment or re-submit with 'resume=True'." + """Responsible for executing all the tasks of a single experiment.""" + + def __init__( + self, + experiment_id: str, + experiment_type: str, + execution_parameters: ExperimentExecutionParameters, + experiment_graph: ExperimentGraph, + experiment_manager: ExperimentManager, + task_manager: TaskManager, + container_manager: ContainerManager, + task_executor: TaskExecutor, + scheduler: AbstractScheduler, + ): + self._experiment_id = experiment_id + self._experiment_type = experiment_type + self._execution_parameters = execution_parameters + self._experiment_graph = experiment_graph + self._experiment_manager = experiment_manager + self._task_manager = task_manager + self._container_manager = container_manager + self._task_executor = task_executor + self._scheduler = scheduler + self._task_input_resolver = TaskInputResolver(task_manager, experiment_manager) + + self._current_task_execution_parameters: dict[str, TaskExecutionParameters] = {} + self._task_output_futures: dict[str, asyncio.Task] = {} + self._experiment_status = None + + def start_experiment( + self, + dynamic_parameters: dict[str, dict[str, Any]] | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + """ + Start the experiment and register the executor with the scheduler. + """ + experiment = self._experiment_manager.get_experiment(self._experiment_id) + if experiment: + self._handle_existing_experiment(experiment) + else: + self._create_new_experiment(dynamic_parameters, metadata) + + self._scheduler.register_experiment( + experiment_id=self._experiment_id, + experiment_type=self._experiment_type, + experiment_graph=self._experiment_graph, ) - status_handlers = { - ExperimentStatus.COMPLETED: lambda: _raise_error("completed"), - ExperimentStatus.SUSPENDED: lambda: _raise_error("suspended"), - ExperimentStatus.CANCELLED: lambda: _raise_error("cancelled"), - ExperimentStatus.FAILED: lambda: _raise_error("failed"), - } - status_handlers.get(self._experiment_status, lambda: None)() - - self._resume_experiment() - - async def cancel_experiment(self) -> None: - """ - Cancel the experiment. - """ - experiment = self._experiment_manager.get_experiment(self._experiment_id) - if not experiment or experiment.status != ExperimentStatus.RUNNING: - raise EosExperimentCancellationError( - f"Cannot cancel experiment '{self._experiment_id}' with status '{experiment.status}'. " - f"It must be running." - ) - - log.warning(f"Cancelling experiment '{self._experiment_id}'...") - self._experiment_status = ExperimentStatus.CANCELLED - self._experiment_manager.cancel_experiment(self._experiment_id) - self._scheduler.unregister_experiment(self._experiment_id) - await self._cancel_running_tasks() - - log.warning(f"Cancelled experiment '{self._experiment_id}'.") - - async def progress_experiment(self) -> bool: - """ - Try to progress the experiment by executing tasks. - - :return: True if the experiment has been completed, False otherwise. - """ - try: - if self._experiment_status != ExperimentStatus.RUNNING: - return self._experiment_status == ExperimentStatus.CANCELLED - - if self._scheduler.is_experiment_completed(self._experiment_id): - self._complete_experiment() - return True - - self._process_completed_tasks() - await self._execute_tasks() - - return False - except Exception as e: - self._fail_experiment() - raise EosExperimentExecutionError(f"Error executing experiment '{self._experiment_id}'") from e - - def _resume_experiment(self) -> None: - """ - Resume an existing experiment. - """ - self._experiment_manager.delete_non_completed_tasks(self._experiment_id) - log.info(f"Experiment '{self._experiment_id}' resumed.") - - def _create_new_experiment(self, dynamic_parameters: dict[str, dict[str, Any]], metadata: dict[str, Any]) -> None: - """ - Create a new experiment with the given parameters. - """ - dynamic_parameters = dynamic_parameters or {} - self._validate_dynamic_parameters(dynamic_parameters) - self._experiment_manager.create_experiment( - experiment_id=self._experiment_id, - experiment_type=self._experiment_type, - execution_parameters=self._execution_parameters, - dynamic_parameters=dynamic_parameters, - metadata=metadata, - ) - - async def _cancel_running_tasks(self) -> None: - """ - Cancel all running tasks in the experiment. - """ - cancellation_futures = [ - self._task_executor.request_task_cancellation(params.experiment_id, params.task_config.id) - for params in self._current_task_execution_parameters.values() - ] - try: - await asyncio.wait_for(asyncio.gather(*cancellation_futures), timeout=30) - except asyncio.TimeoutError as e: - raise EosExperimentExecutionError( - f"Timeout while cancelling experiment {self._experiment_id}. Some tasks may not have been cancelled." - ) from e - - def _complete_experiment(self) -> None: - """ - Complete the experiment and clean up. - """ - self._scheduler.unregister_experiment(self._experiment_id) - self._experiment_manager.complete_experiment(self._experiment_id) - self._experiment_status = ExperimentStatus.COMPLETED - - def _fail_experiment(self) -> None: - """ - Fail the experiment. - """ - self._scheduler.unregister_experiment(self._experiment_id) - self._experiment_manager.fail_experiment(self._experiment_id) - self._experiment_status = ExperimentStatus.FAILED - - def _process_completed_tasks(self) -> None: - """ - Process the output of completed tasks. - """ - completed_tasks = [task_id for task_id, future in self._task_output_futures.items() if future.done()] - for task_id in completed_tasks: - self._process_task_output(task_id) - - def _process_task_output(self, task_id: str) -> None: - """ - Process the output of a single completed task. - """ - try: - result = self._task_output_futures[task_id].result() - if result: - output_parameters, output_containers, output_files = result - self._update_containers(output_containers) - self._add_task_output(task_id, output_parameters, output_containers, output_files) - self._task_manager.complete_task(self._experiment_id, task_id) - log.info(f"EXP '{self._experiment_id}' - Completed task '{task_id}'.") - except EosTaskExecutionError as e: - raise EosExperimentTaskExecutionError( - f"Error executing task '{task_id}' of experiment '{self._experiment_id}'" - ) from e - finally: - del self._task_output_futures[task_id] - del self._current_task_execution_parameters[task_id] - - def _update_containers(self, output_containers: dict[str, Any]) -> None: - """ - Update containers with task output. - """ - for container in output_containers.values(): - self._container_manager.update_container(container) - - def _add_task_output( - self, - task_id: str, - output_parameters: dict[str, Any], - output_containers: dict[str, Any], - output_files: dict[str, Any], - ) -> None: - """ - Add task output to the task manager. - """ - task_output = TaskOutput( - experiment_id=self._experiment_id, - task_id=task_id, - parameters=output_parameters, - containers=output_containers, - file_names=list(output_files.keys()), - ) - for file_name, file_data in output_files.items(): - self._task_manager.add_task_output_file(self._experiment_id, task_id, file_name, file_data) - self._task_manager.add_task_output(self._experiment_id, task_id, task_output) - - async def _execute_tasks(self) -> None: - """ - Request and execute new tasks from the scheduler. - """ - new_scheduled_tasks = await self._scheduler.request_tasks(self._experiment_id) - for scheduled_task in new_scheduled_tasks: - if scheduled_task.id not in self._current_task_execution_parameters: - await self._execute_task(scheduled_task) - - async def _execute_task(self, scheduled_task: ScheduledTask) -> None: - """ - Execute a single task. - """ - task_config = self._experiment_graph.get_task_config(scheduled_task.id) - task_config = self._task_input_resolver.resolve_task_inputs(self._experiment_id, task_config) - task_execution_parameters = TaskExecutionParameters( - task_id=scheduled_task.id, - experiment_id=self._experiment_id, - devices=scheduled_task.devices, - task_config=task_config, - ) - self._task_output_futures[scheduled_task.id] = asyncio.create_task( - self._task_executor.request_task_execution(task_execution_parameters, scheduled_task) - ) - self._current_task_execution_parameters[scheduled_task.id] = task_execution_parameters - - def _validate_dynamic_parameters(self, dynamic_parameters: dict[str, dict[str, Any]]) -> None: - """ - Validate that all required dynamic parameters are provided and there are no surplus parameters. - """ - required_params = self._get_required_dynamic_parameters() - provided_params = { - f"{task_id}.{param_name}" for task_id, params in dynamic_parameters.items() for param_name in params - } - - missing_params = required_params - provided_params - unexpected_params = provided_params - required_params - - if missing_params: - raise EosExperimentExecutionError(f"Missing values for dynamic parameters: {missing_params}") - if unexpected_params: - raise EosExperimentExecutionError(f"Unexpected dynamic parameters provided: {unexpected_params}") - - def _get_required_dynamic_parameters(self) -> set[str]: - """ - Get a set of all required dynamic parameters in the experiment graph. - """ - return { - f"{task_id}.{param_name}" - for task_id in self._experiment_graph.get_tasks() - for param_name, param_value in self._experiment_graph.get_task_config(task_id).parameters.items() - if validation_utils.is_dynamic_parameter(param_value) - } + self._experiment_manager.start_experiment(self._experiment_id) + self._experiment_status = ExperimentStatus.RUNNING + + log.info(f"{'Resumed' if self._execution_parameters.resume else 'Started'} experiment '{self._experiment_id}'.") + + def _handle_existing_experiment(self, experiment: Experiment) -> None: + """ + Handle cases when the experiment already exists. + """ + self._experiment_status = experiment.status + + if not self._execution_parameters.resume: + + def _raise_error(status: str) -> None: + raise EosExperimentExecutionError( + f"Cannot start experiment '{self._experiment_id}' as it already exists and is '{status}'. " + f"Please create a new experiment or re-submit with 'resume=True'." + ) + + status_handlers = { + ExperimentStatus.COMPLETED: lambda: _raise_error("completed"), + ExperimentStatus.SUSPENDED: lambda: _raise_error("suspended"), + ExperimentStatus.CANCELLED: lambda: _raise_error("cancelled"), + ExperimentStatus.FAILED: lambda: _raise_error("failed"), + } + status_handlers.get(self._experiment_status, lambda: None)() + else: + self._resume_experiment() + + async def cancel_experiment(self) -> None: + """ + Cancel the experiment. + """ + experiment = self._experiment_manager.get_experiment(self._experiment_id) + if not experiment or experiment.status != ExperimentStatus.RUNNING: + raise EosExperimentCancellationError( + f"Cannot cancel experiment '{self._experiment_id}' with status '{experiment.status}'. " + f"It must be running." + ) + + log.warning(f"Cancelling experiment '{self._experiment_id}'...") + self._experiment_status = ExperimentStatus.CANCELLED + self._experiment_manager.cancel_experiment(self._experiment_id) + self._scheduler.unregister_experiment(self._experiment_id) + await self._cancel_running_tasks() + + log.warning(f"Cancelled experiment '{self._experiment_id}'.") + + async def progress_experiment(self) -> bool: + """ + Try to progress the experiment by executing tasks. + + :return: True if the experiment has been completed, False otherwise. + """ + try: + if self._experiment_status != ExperimentStatus.RUNNING: + return self._experiment_status == ExperimentStatus.CANCELLED + + if self._scheduler.is_experiment_completed(self._experiment_id): + self._complete_experiment() + return True + + self._process_completed_tasks() + await self._execute_tasks() + + return False + except Exception as e: + self._fail_experiment() + raise EosExperimentExecutionError(f"Error executing experiment '{self._experiment_id}'") from e + + def _resume_experiment(self) -> None: + """ + Resume an existing experiment. + """ + self._experiment_manager.delete_non_completed_tasks(self._experiment_id) + log.info(f"Experiment '{self._experiment_id}' resumed.") + + def _create_new_experiment(self, dynamic_parameters: dict[str, dict[str, Any]], metadata: dict[str, Any]) -> None: + """ + Create a new experiment with the given parameters. + """ + dynamic_parameters = dynamic_parameters or {} + self._validate_dynamic_parameters(dynamic_parameters) + self._experiment_manager.create_experiment( + experiment_id=self._experiment_id, + experiment_type=self._experiment_type, + execution_parameters=self._execution_parameters, + dynamic_parameters=dynamic_parameters, + metadata=metadata, + ) + + async def _cancel_running_tasks(self) -> None: + """ + Cancel all running tasks in the experiment. + """ + cancellation_futures = [ + self._task_executor.request_task_cancellation(params.experiment_id, params.task_config.id) + for params in self._current_task_execution_parameters.values() + ] + try: + await asyncio.wait_for(asyncio.gather(*cancellation_futures), timeout=30) + except asyncio.TimeoutError as e: + raise EosExperimentExecutionError( + f"Timeout while cancelling experiment {self._experiment_id}. Some tasks may not have been cancelled." + ) from e + + def _complete_experiment(self) -> None: + """ + Complete the experiment and clean up. + """ + self._scheduler.unregister_experiment(self._experiment_id) + self._experiment_manager.complete_experiment(self._experiment_id) + self._experiment_status = ExperimentStatus.COMPLETED + + def _fail_experiment(self) -> None: + """ + Fail the experiment. + """ + self._scheduler.unregister_experiment(self._experiment_id) + self._experiment_manager.fail_experiment(self._experiment_id) + self._experiment_status = ExperimentStatus.FAILED + + def _process_completed_tasks(self) -> None: + """ + Process the output of completed tasks. + """ + completed_tasks = [task_id for task_id, future in self._task_output_futures.items() if future.done()] + for task_id in completed_tasks: + self._process_task_output(task_id) + + def _process_task_output(self, task_id: str) -> None: + """ + Process the output of a single completed task. + """ + try: + result = self._task_output_futures[task_id].result() + if result: + output_parameters, output_containers, output_files = result + self._update_containers(output_containers) + self._add_task_output(task_id, output_parameters, output_containers, output_files) + self._task_manager.complete_task(self._experiment_id, task_id) + log.info(f"EXP '{self._experiment_id}' - Completed task '{task_id}'.") + except EosTaskExecutionError as e: + raise EosExperimentTaskExecutionError( + f"Error executing task '{task_id}' of experiment '{self._experiment_id}'" + ) from e + finally: + del self._task_output_futures[task_id] + del self._current_task_execution_parameters[task_id] + + def _update_containers(self, output_containers: dict[str, Any]) -> None: + """ + Update containers with task output. + """ + for container in output_containers.values(): + self._container_manager.update_container(container) + + def _add_task_output( + self, + task_id: str, + output_parameters: dict[str, Any], + output_containers: dict[str, Any], + output_files: dict[str, Any], + ) -> None: + """ + Add task output to the task manager. + """ + task_output = TaskOutput( + experiment_id=self._experiment_id, + task_id=task_id, + parameters=output_parameters, + containers=output_containers, + file_names=list(output_files.keys()), + ) + for file_name, file_data in output_files.items(): + self._task_manager.add_task_output_file(self._experiment_id, task_id, file_name, file_data) + self._task_manager.add_task_output(self._experiment_id, task_id, task_output) + + async def _execute_tasks(self) -> None: + """ + Request and execute new tasks from the scheduler. + """ + new_scheduled_tasks = await self._scheduler.request_tasks(self._experiment_id) + for scheduled_task in new_scheduled_tasks: + if scheduled_task.id not in self._current_task_execution_parameters: + await self._execute_task(scheduled_task) + + async def _execute_task(self, scheduled_task: ScheduledTask) -> None: + """ + Execute a single task. + """ + task_config = self._experiment_graph.get_task_config(scheduled_task.id) + task_config = self._task_input_resolver.resolve_task_inputs(self._experiment_id, task_config) + task_execution_parameters = TaskExecutionParameters( + task_id=scheduled_task.id, + experiment_id=self._experiment_id, + devices=scheduled_task.devices, + task_config=task_config, + ) + self._task_output_futures[scheduled_task.id] = asyncio.create_task( + self._task_executor.request_task_execution(task_execution_parameters, scheduled_task) + ) + self._current_task_execution_parameters[scheduled_task.id] = task_execution_parameters + + def _validate_dynamic_parameters(self, dynamic_parameters: dict[str, dict[str, Any]]) -> None: + """ + Validate that all required dynamic parameters are provided and there are no surplus parameters. + """ + required_params = self._get_required_dynamic_parameters() + provided_params = { + f"{task_id}.{param_name}" for task_id, params in dynamic_parameters.items() for param_name in params + } + + missing_params = required_params - provided_params + unexpected_params = provided_params - required_params + + if missing_params: + raise EosExperimentExecutionError(f"Missing values for dynamic parameters: {missing_params}") + if unexpected_params: + raise EosExperimentExecutionError(f"Unexpected dynamic parameters provided: {unexpected_params}") + + def _get_required_dynamic_parameters(self) -> set[str]: + """ + Get a set of all required dynamic parameters in the experiment graph. + """ + return { + f"{task_id}.{param_name}" + for task_id in self._experiment_graph.get_tasks() + for param_name, param_value in self._experiment_graph.get_task_config(task_id).parameters.items() + if validation_utils.is_dynamic_parameter(param_value) + } diff --git a/eos/optimization/abstract_sequential_optimizer.py b/eos/optimization/abstract_sequential_optimizer.py index d85870a..6c6ac58 100644 --- a/eos/optimization/abstract_sequential_optimizer.py +++ b/eos/optimization/abstract_sequential_optimizer.py @@ -52,3 +52,11 @@ def get_output_names(self) -> list[str]: :return: A list of the names of the output parameter values. """ + + @abstractmethod + def get_num_samples_reported(self) -> int: + """ + Get the number of samples reported to the optimizer. + + :return: The number of samples reported to the optimizer. + """ diff --git a/eos/optimization/sequential_bayesian_optimizer.py b/eos/optimization/sequential_bayesian_optimizer.py index 85afc20..4ce71eb 100644 --- a/eos/optimization/sequential_bayesian_optimizer.py +++ b/eos/optimization/sequential_bayesian_optimizer.py @@ -17,8 +17,8 @@ from bofire.data_models.strategies.predictives.sobo import SoboStrategy from pandas import Series -from eos.optimization.exceptions import EosCampaignOptimizerDomainError from eos.optimization.abstract_sequential_optimizer import AbstractSequentialOptimizer +from eos.optimization.exceptions import EosCampaignOptimizerDomainError class BayesianSequentialOptimizer(AbstractSequentialOptimizer): @@ -51,7 +51,7 @@ def __init__( self._generate_initial_samples: bool = self._num_initial_samples > 0 self._initial_samples_df: pd.DataFrame | None = None - self._results_reported: int = 0 + self._num_samples_reported: int = 0 self._optimizer_data_model = ( SoboStrategy(domain=self._domain, acquisition_function=acquisition_function) @@ -61,7 +61,7 @@ def __init__( self._optimizer = strategies.map(data_model=self._optimizer_data_model) def sample(self, num_experiments: int = 1) -> pd.DataFrame: - if self._generate_initial_samples and self._results_reported < self._num_initial_samples: + if self._generate_initial_samples and self._num_samples_reported < self._num_initial_samples: if self._initial_samples_df is None: self._generate_initial_samples_df() @@ -89,7 +89,7 @@ def report(self, inputs_df: pd.DataFrame, outputs_df: pd.DataFrame) -> None: self._validate_sample(inputs_df, outputs_df) results_df = pd.concat([inputs_df, outputs_df], axis=1) self._optimizer.tell(results_df) - self._results_reported += len(results_df) + self._num_samples_reported += len(results_df) def get_optimal_solutions(self) -> pd.DataFrame: experiments = self._optimizer.experiments @@ -137,6 +137,9 @@ def get_input_names(self) -> list[str]: def get_output_names(self) -> list[str]: return self._output_names + def get_num_samples_reported(self) -> int: + return self._num_samples_reported + def _get_output(self, output_name: str) -> OutputType: for output in self._domain.outputs.features: if output.key == output_name: diff --git a/eos/optimization/sequential_optimizer_actor.py b/eos/optimization/sequential_optimizer_actor.py index f97f964..a8b6dae 100644 --- a/eos/optimization/sequential_optimizer_actor.py +++ b/eos/optimization/sequential_optimizer_actor.py @@ -25,3 +25,6 @@ def get_input_names(self) -> list[str]: def get_output_names(self) -> list[str]: return self.optimizer.get_output_names() + + def get_num_samples_reported(self) -> int: + return self.optimizer.get_num_samples_reported() diff --git a/eos/tasks/base_task.py b/eos/tasks/base_task.py index fe8bec2..4682c5e 100644 --- a/eos/tasks/base_task.py +++ b/eos/tasks/base_task.py @@ -2,14 +2,14 @@ from typing import Any from eos.containers.entities.container import Container -from eos.devices.device_actor_references import DeviceRayActorWrapperReferences +from eos.devices.device_actor_wrapper_registry import DeviceActorWrapperRegistry from eos.tasks.exceptions import EosTaskExecutionError class BaseTask(ABC): """Base class for all tasks in EOS.""" - DevicesType = dict[str, DeviceRayActorWrapperReferences] + DevicesType = dict[str, DeviceActorWrapperRegistry] ParametersType = dict[str, Any] ContainersType = dict[str, Container] FilesType = dict[str, bytes] diff --git a/eos/tasks/task_executor.py b/eos/tasks/task_executor.py index 612a773..ba366fe 100644 --- a/eos/tasks/task_executor.py +++ b/eos/tasks/task_executor.py @@ -9,7 +9,7 @@ from eos.configuration.configuration_manager import ConfigurationManager from eos.containers.container_manager import ContainerManager from eos.containers.entities.container import Container -from eos.devices.device_actor_references import DeviceRayActorReference, DeviceRayActorWrapperReferences +from eos.devices.device_actor_wrapper_registry import DeviceActorReference, DeviceActorWrapperRegistry from eos.devices.device_manager import DeviceManager from eos.logging.logger import log from eos.resource_allocation.entities.resource_request import ( @@ -127,8 +127,8 @@ async def request_task_cancellation(self, experiment_id: str, task_id: str) -> N self._resource_allocation_manager.process_active_requests() self._task_manager.cancel_task(experiment_id, task_id) - log.warning(f"EXP '{experiment_id}' - Cancelled task '{task_id}'.") del self._active_tasks[task_id] + log.warning(f"EXP '{experiment_id}' - Cancelled task '{task_id}'.") def _prepare_containers(self, execution_parameters: TaskExecutionParameters) -> dict[str, Container]: return { @@ -163,9 +163,9 @@ async def _allocate_resources( resource_request = self._create_resource_request(execution_parameters) return await self._request_resources(resource_request, execution_parameters.resource_allocation_timeout) - def _get_device_actor_references(self, task_parameters: TaskExecutionParameters) -> list[DeviceRayActorReference]: + def _get_device_actor_references(self, task_parameters: TaskExecutionParameters) -> list[DeviceActorReference]: return [ - DeviceRayActorReference( + DeviceActorReference( id=device.id, lab_id=device.lab_id, type=self._configuration_manager.labs[device.lab_id].devices[device.id].type, @@ -194,12 +194,12 @@ def _execute_task( def _ray_execute_task( _experiment_id: str, _task_id: str, - _devices_actor_references: list[DeviceRayActorReference], + _devices_actor_references: list[DeviceActorReference], _parameters: dict[str, Any], _containers: dict[str, Container], ) -> tuple: task = task_class_type(_experiment_id, _task_id) - devices = DeviceRayActorWrapperReferences(_devices_actor_references) + devices = DeviceActorWrapperRegistry(_devices_actor_references) return task.execute(devices, _parameters, _containers) self._task_manager.start_task(experiment_id, task_id) diff --git a/eos/utils/ray_utils.py b/eos/utils/ray_utils.py index da2031e..d0f1269 100644 --- a/eos/utils/ray_utils.py +++ b/eos/utils/ray_utils.py @@ -5,16 +5,14 @@ class RayActorWrapper: - """ - Wrapper for Ray actors to allow for easy synchronous calls to actor methods. - """ + """Wrapper for Ray actors to allow for easy synchronous calls to actor methods.""" def __init__(self, actor: ActorHandle): - self.actor = actor + self._actor = actor def __getattr__(self, name: str) -> Any: if not name.startswith("__"): - async_func = getattr(self.actor, name) + async_func = getattr(self._actor, name) def wrapper(*args, **kwargs) -> Any: return ray.get(async_func.remote(*args, **kwargs)) @@ -23,18 +21,6 @@ def wrapper(*args, **kwargs) -> Any: return super().__getattr__(name) - -def ray_run(ray_remote_method: callable, *args, **kwargs) -> Any: - """ - A helper function to simplify calling Ray remote functions. - - Args: - ray_remote_method: The Ray remote method to be invoked. - *args: Arguments to be passed to the remote method. - **kwargs: Keyword arguments to be passed to the remote method. - - Returns: - The result of the Ray remote method call. - """ - # Invoke the remote method and get the result - return ray.get(ray_remote_method.remote(*args, **kwargs)) + @property + def actor(self) -> ActorHandle: + return self._actor diff --git a/pyproject.toml b/pyproject.toml index e342404..b3e13d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,9 @@ line-length = 120 testpaths = [ "tests", ] +markers = [ + "slow: mark tests as slow (deselect with '-m \"not slow\"')", +] [tool.ruff] include = [ diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d8351ee --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,7 @@ +def pytest_collection_modifyitems(items): + """Sort tests by the slow marker. Tests with the slow marker will be executed last.""" + + def weight(item): + return 1 if item.get_closest_marker("slow") else 0 + + items.sort(key=weight) diff --git a/tests/fixtures.py b/tests/fixtures.py index 4ee6c83..b73452a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -125,7 +125,6 @@ def experiment_graph(setup_lab_experiment): @pytest.fixture def clean_db(db_manager): - print("Cleaned up DB.") db_manager.clean_db() @@ -135,7 +134,7 @@ def container_manager(setup_lab_experiment, configuration_manager, db_manager, c @pytest.fixture -def device_manager(setup_lab_experiment, configuration_manager, db_manager, ray_cluster, clean_db): +def device_manager(setup_lab_experiment, configuration_manager, ray_cluster, db_manager, clean_db): device_manager = DeviceManager(configuration_manager, db_manager) device_manager.update_devices(loaded_labs=set(configuration_manager.labs.keys())) yield device_manager @@ -167,7 +166,7 @@ def task_manager(setup_lab_experiment, configuration_manager, db_manager, file_d return TaskManager(configuration_manager, db_manager, file_db_manager) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def ray_cluster(): ray.init(namespace="test-eos", ignore_reinit_error=True, resources={"eos-core": 1}) yield diff --git a/tests/test_base_device.py b/tests/test_base_device.py new file mode 100644 index 0000000..e6934d9 --- /dev/null +++ b/tests/test_base_device.py @@ -0,0 +1,94 @@ +from typing import Any +from unittest.mock import Mock + +import pytest + +from eos.devices.base_device import BaseDevice, DeviceStatus +from eos.devices.exceptions import EosDeviceError, EosDeviceCleanupError, EosDeviceInitializationError + + +class MockDevice(BaseDevice): + def __init__(self, device_id: str, lab_id: str, device_type: str, initialization_parameters: dict[str, Any]): + self.mock_resource = None + super().__init__(device_id, lab_id, device_type, initialization_parameters) + + def _initialize(self, initialization_parameters: dict[str, Any]) -> None: + self.mock_resource = Mock() + + def _cleanup(self) -> None: + if self.mock_resource: + self.mock_resource.close() + self.mock_resource = None + + def _report(self) -> dict[str, Any]: + return {"mock_resource": str(self.mock_resource)} + + def raise_exception(self): + raise ValueError("Test exception") + + +class TestBaseDevice: + @pytest.fixture + def mock_device(self): + return MockDevice("test_device", "test_lab", "mock", {}) + + def test_initialize(self, mock_device): + assert mock_device.id == "test_device" + assert mock_device.type == "mock" + assert mock_device.status == DeviceStatus.IDLE + assert mock_device.mock_resource is not None + + def test_cleanup(self, mock_device): + mock_device.cleanup() + assert mock_device.status == DeviceStatus.DISABLED + assert mock_device.mock_resource is None + + def test_enable_disable(self, mock_device): + mock_device.disable() + assert mock_device.status == DeviceStatus.DISABLED + mock_device.enable() + assert mock_device.status == DeviceStatus.IDLE + + def test_report(self, mock_device): + report = mock_device.report() + assert "mock_resource" in report + + def test_report_status(self, mock_device): + status_report = mock_device.report_status() + assert status_report["id"] == "test_device" + assert status_report["status"] == DeviceStatus.IDLE + + def test_exception_handling(self, mock_device): + with pytest.raises(EosDeviceError): + mock_device.raise_exception() + assert mock_device.status == DeviceStatus.ERROR + + def test_initialization_error(self): + class FailingDevice(MockDevice): + def _initialize(self, initialization_parameters: dict[str, Any]) -> None: + raise ValueError("Initialization failed") + + with pytest.raises(EosDeviceInitializationError): + FailingDevice("fail_device", "test_lab", "failing", {}) + + def test_cleanup_error(self, mock_device): + mock_device.mock_resource.close.side_effect = Exception("Cleanup failed") + with pytest.raises(EosDeviceError): + mock_device.cleanup() + assert mock_device.status == DeviceStatus.ERROR + + def test_busy_status_cleanup(self, mock_device): + mock_device._status = DeviceStatus.BUSY + with pytest.raises(EosDeviceCleanupError): + mock_device.cleanup() + assert mock_device.status == DeviceStatus.BUSY + + def test_double_initialization(self, mock_device): + with pytest.raises(EosDeviceInitializationError): + mock_device.initialize({}) + assert mock_device.status == DeviceStatus.IDLE + + def test_del_method(self, mock_device): + mock_device.__del__() + assert mock_device.status == DeviceStatus.DISABLED + assert mock_device.mock_resource is None diff --git a/tests/test_base_task.py b/tests/test_base_task.py new file mode 100644 index 0000000..03f9cbe --- /dev/null +++ b/tests/test_base_task.py @@ -0,0 +1,109 @@ +from unittest.mock import Mock + +import pytest + +from eos.containers.entities.container import Container +from eos.devices.device_actor_wrapper_registry import DeviceActorWrapperRegistry +from eos.tasks.base_task import BaseTask +from eos.tasks.exceptions import EosTaskExecutionError + + +class ConcreteTask(BaseTask): + def _execute( + self, devices: BaseTask.DevicesType, parameters: BaseTask.ParametersType, containers: BaseTask.ContainersType + ) -> BaseTask.OutputType | None: + return {"out_param": parameters["param1"]}, {"container1": containers["container1"]}, {"file": b"content"} + + +class TestBaseTask: + @pytest.fixture + def concrete_task(self): + return ConcreteTask("exp_id", "task_id") + + @pytest.fixture + def container(self): + return Container(id="container_id", type="beaker", lab="lab", location="shelf") + + def test_init(self): + task = ConcreteTask("exp_id", "task_id") + assert task._experiment_id == "exp_id" + assert task._task_id == "task_id" + + def test_execute_success(self, concrete_task, container): + devices = {"device1": Mock(spec=DeviceActorWrapperRegistry)} + parameters = {"param1": "value1"} + containers = {"container1": container} + + result = concrete_task.execute(devices, parameters, containers) + + assert isinstance(result, tuple) + assert len(result) == 3 + assert isinstance(result[0], dict) + assert isinstance(result[1], dict) + assert isinstance(result[2], dict) + assert result[0] == {"out_param": "value1"} + assert result[1] == {"container1": container} + assert result[2] == {"file": b"content"} + + def test_execute_failure(self, container): + class FailingTask(BaseTask): + def _execute( + self, + devices: BaseTask.DevicesType, + parameters: BaseTask.ParametersType, + containers: BaseTask.ContainersType, + ) -> BaseTask.OutputType | None: + raise ValueError("Test error") + + devices = {"device1": Mock(spec=DeviceActorWrapperRegistry)} + parameters = {"param1": "value1"} + containers = {"container1": container} + + failing_task = FailingTask("exp_id", "task_id") + with pytest.raises(EosTaskExecutionError): + failing_task.execute(devices, parameters, containers) + + def test_execute_empty_output(self, concrete_task): + class EmptyOutputTask(BaseTask): + def _execute( + self, + devices: BaseTask.DevicesType, + parameters: BaseTask.ParametersType, + containers: BaseTask.ContainersType, + ) -> BaseTask.OutputType | None: + return None + + task = EmptyOutputTask("exp_id", "task_id") + result = task.execute({}, {}, {}) + + assert result == ({}, {}, {}) + + def test_execute_partial_output(self, concrete_task): + class PartialOutputTask(BaseTask): + def _execute( + self, + devices: BaseTask.DevicesType, + parameters: BaseTask.ParametersType, + containers: BaseTask.ContainersType, + ) -> BaseTask.OutputType | None: + return {"out_param": "value"}, None, None + + task = PartialOutputTask("exp_id", "task_id") + result = task.execute({}, {}, {}) + + assert result == ({"out_param": "value"}, {}, {}) + + def test_automatic_input_container_passthrough(self, concrete_task, container): + class InputContainerPassthroughTask(BaseTask): + def _execute( + self, + devices: BaseTask.DevicesType, + parameters: BaseTask.ParametersType, + containers: BaseTask.ContainersType, + ) -> BaseTask.OutputType | None: + return None + + task = InputContainerPassthroughTask("exp_id", "task_id") + result = task.execute({}, {}, {"container1": container}) + + assert result == ({}, {"container1": container}, {}) diff --git a/tests/test_bayesian_sequential_optimizer.py b/tests/test_bayesian_sequential_optimizer.py index 8c6a34e..55521f6 100644 --- a/tests/test_bayesian_sequential_optimizer.py +++ b/tests/test_bayesian_sequential_optimizer.py @@ -1,4 +1,5 @@ import pandas as pd +import pytest from bofire.data_models.acquisition_functions.acquisition_function import qLogNEI, qLogNEHVI from bofire.data_models.enum import SamplingMethodEnum from bofire.data_models.features.continuous import ContinuousInput, ContinuousOutput @@ -8,6 +9,7 @@ class TestCampaignBayesianOptimizer: + @pytest.mark.slow def test_single_objective_optimization(self): optimizer = BayesianSequentialOptimizer( inputs=[ @@ -30,6 +32,7 @@ def test_single_objective_optimization(self): assert len(optimal_solutions) == 1 assert abs(optimal_solutions["y"].to_numpy()[0] - 4) < 0.01 + @pytest.mark.slow def test_competing_multi_objective_optimization(self): optimizer = BayesianSequentialOptimizer( inputs=[ @@ -71,8 +74,8 @@ def test_competing_multi_objective_optimization(self): for true_solution in true_pareto_front: assert any( - abs(solution["x"] - true_solution["x"]) < 0.5 - and abs(solution["y1"] - true_solution["y1"]) < 0.5 - and abs(solution["y2"] - true_solution["y2"]) < 0.5 + abs(solution["x"] - true_solution["x"]) < 0.7 + and abs(solution["y1"] - true_solution["y1"]) < 0.7 + and abs(solution["y2"] - true_solution["y2"]) < 0.7 for _, solution in optimal_solutions.iterrows() ) diff --git a/tests/test_campaign_executor.py b/tests/test_campaign_executor.py index 2a2a2a4..4e6b2a3 100644 --- a/tests/test_campaign_executor.py +++ b/tests/test_campaign_executor.py @@ -1,6 +1,8 @@ import asyncio from eos.campaigns.entities.campaign import CampaignStatus +from eos.campaigns.exceptions import EosCampaignExecutionError +from eos.experiments.exceptions import EosExperimentExecutionError from tests.fixtures import * LAB_ID = "multiplication_lab" @@ -21,6 +23,7 @@ indirect=True, ) class TestCampaignExecutor: + @pytest.mark.slow @pytest.mark.asyncio async def test_start_campaign(self, campaign_executor, campaign_manager): await campaign_executor.start_campaign() @@ -30,6 +33,7 @@ async def test_start_campaign(self, campaign_executor, campaign_manager): assert campaign.id == CAMPAIGN_ID assert campaign.status == CampaignStatus.RUNNING + @pytest.mark.slow @pytest.mark.asyncio async def test_progress_campaign(self, campaign_executor, campaign_manager, campaign_optimizer_manager): await campaign_executor.start_campaign() @@ -43,3 +47,141 @@ async def test_progress_campaign(self, campaign_executor, campaign_manager, camp assert not solutions.empty assert len(solutions) == 1 assert solutions["compute_multiplication_objective.objective"].iloc[0] / 100 <= 80 + + @pytest.mark.slow + @pytest.mark.asyncio + async def test_progress_campaign_failure(self, campaign_executor, campaign_manager, monkeypatch): + await campaign_executor.start_campaign() + await campaign_executor.progress_campaign() + + # Mock the progress_experiment method to raise an EosExperimentExecutionError + async def mock_progress_experiment(*args, **kwargs): + raise EosExperimentExecutionError("Simulated experiment execution error") + + monkeypatch.setattr( + "eos.experiments.experiment_executor.ExperimentExecutor.progress_experiment", mock_progress_experiment + ) + + # Attempt to progress the campaign + with pytest.raises(EosCampaignExecutionError) as exc_info: + await campaign_executor.progress_campaign() + assert f"Error executing campaign '{CAMPAIGN_ID}'" in str(exc_info.value) + assert campaign_executor._campaign_status == CampaignStatus.FAILED + + # Verify that the campaign manager has marked the campaign as failed + campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + assert campaign.status == CampaignStatus.FAILED + + @pytest.mark.slow + @pytest.mark.asyncio + async def test_campaign_cancellation(self, campaign_executor, campaign_manager): + await campaign_executor.start_campaign() + + # Run until two experiments are complete + completed_experiments = 0 + while completed_experiments < 2: + await campaign_executor.progress_campaign() + campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + completed_experiments = campaign.experiments_completed + await asyncio.sleep(0.1) + + # Ensure we have at least one running experiment + assert len(campaign_executor._experiment_executors) > 0 + + await campaign_executor.cancel_campaign() + + campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + assert campaign.status == CampaignStatus.CANCELLED + + # Try to progress the campaign after cancellation + campaign_finished = await campaign_executor.progress_campaign() + assert campaign_finished + assert len(campaign_executor._experiment_executors) == 0 + + with pytest.raises(EosCampaignExecutionError): + await campaign_executor.start_campaign() + + @pytest.mark.slow + @pytest.mark.asyncio + async def test_campaign_resuming( + self, campaign_executor, campaign_manager, campaign_optimizer_manager, task_manager, experiment_executor_factory + ): + await campaign_executor.start_campaign() + + # Run until three experiments are complete + completed_experiments = 0 + while completed_experiments < 3: + await campaign_executor.progress_campaign() + campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + completed_experiments = campaign.experiments_completed + await asyncio.sleep(0.1) + + initial_campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + num_initial_reported_samples = ray.get(campaign_executor.optimizer.get_num_samples_reported.remote()) + + await campaign_executor.cancel_campaign() + assert campaign_manager.get_campaign(CAMPAIGN_ID).status == CampaignStatus.CANCELLED + campaign_executor.cleanup() + + # Create a new campaign executor to resume the campaign + new_execution_parameters = CampaignExecutionParameters( + max_experiments=MAX_EXPERIMENTS, do_optimization=DO_OPTIMIZATION, resume=True + ) + new_campaign_executor = CampaignExecutor( + CAMPAIGN_ID, + EXPERIMENT_TYPE, + new_execution_parameters, + campaign_manager, + campaign_optimizer_manager, + task_manager, + experiment_executor_factory, + ) + await new_campaign_executor.start_campaign() + resumed_campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + assert resumed_campaign.status == CampaignStatus.RUNNING + + # Verify that the number of completed experiments is preserved + assert resumed_campaign.experiments_completed == initial_campaign.experiments_completed + + # Check that the reported samples to the optimizer are preserved + num_restored_reported_samples = ray.get(new_campaign_executor.optimizer.get_num_samples_reported.remote()) + print(num_restored_reported_samples) + assert num_restored_reported_samples == num_initial_reported_samples + + # Run a few more iterations to ensure the campaign continues properly + for _ in range(5): + await new_campaign_executor.progress_campaign() + + await new_campaign_executor.cancel_campaign() + new_campaign_executor.cleanup() + + @pytest.mark.slow + @pytest.mark.asyncio + async def test_campaign_cancellation_timeout(self, campaign_executor, campaign_manager): + await campaign_executor.start_campaign() + + # Run until one experiment is complete + while ( + campaign_manager.get_campaign(CAMPAIGN_ID).experiments_completed < 1 + or len(campaign_executor._experiment_executors) < 1 + ): + await campaign_executor.progress_campaign() + await asyncio.sleep(0.1) + + class SlowCancelExperimentExecutor: + async def cancel_experiment(self): + await asyncio.sleep(16) # Sleep for longer than the timeout + + # Replace the experiment executors with the slow version + campaign_executor._experiment_executors = { + "exp1": SlowCancelExperimentExecutor(), + "exp2": SlowCancelExperimentExecutor(), + } + + # Try to cancel the campaign, expect a timeout + with pytest.raises(EosCampaignExecutionError) as exc_info: + await campaign_executor.cancel_campaign() + assert "Timed out while cancelling experiments" in str(exc_info.value) + + campaign = campaign_manager.get_campaign(CAMPAIGN_ID) + assert campaign.status == CampaignStatus.CANCELLED diff --git a/tests/test_device_actor_wrapper_registry.py b/tests/test_device_actor_wrapper_registry.py new file mode 100644 index 0000000..7bd7158 --- /dev/null +++ b/tests/test_device_actor_wrapper_registry.py @@ -0,0 +1,69 @@ +import pytest +import ray + +from eos.devices.device_actor_wrapper_registry import DeviceActorWrapperRegistry, DeviceActorReference +from eos.utils.ray_utils import RayActorWrapper + + +@ray.remote +class DummyDevice: + def __init__(self, device_id): + self.device_id = device_id + + def get_id(self): + return self.device_id + + +@pytest.fixture +def device_actor_references(): + return [ + DeviceActorReference("d1", "lab1", "type1", DummyDevice.remote("d1")), + DeviceActorReference("d2", "lab1", "type2", DummyDevice.remote("d2")), + DeviceActorReference("d3", "lab2", "type1", DummyDevice.remote("d3")), + DeviceActorReference("d4", "lab2", "type2", DummyDevice.remote("d4")), + ] + + +@pytest.fixture +def device_actor_wrapper_references(device_actor_references): + return DeviceActorWrapperRegistry(device_actor_references) + + +class TestDeviceActorWrapperRegistry: + def test_get_existing_device(self, device_actor_wrapper_references): + device = device_actor_wrapper_references.get("lab1", "d1") + assert isinstance(device, RayActorWrapper) + assert device.get_id() == "d1" + + def test_get_nonexistent_device(self, device_actor_wrapper_references): + device = device_actor_wrapper_references.get("lab1", "nonexistent") + assert device is None + + def test_get_all_by_lab_id(self, device_actor_wrapper_references): + devices = device_actor_wrapper_references.get_all_by_lab_id("lab1") + assert len(devices) == 2 + assert all(isinstance(device, RayActorWrapper) for device in devices) + device_ids = [device.get_id() for device in devices] + assert set(device_ids) == {"d1", "d2"} + + def test_get_all_by_nonexistent_lab_id(self, device_actor_wrapper_references): + devices = device_actor_wrapper_references.get_all_by_lab_id("nonexistent") + assert len(devices) == 0 + + def test_get_all_by_type(self, device_actor_wrapper_references): + devices = device_actor_wrapper_references.get_all_by_type("type1") + assert len(devices) == 2 + assert all(isinstance(device, RayActorWrapper) for device in devices) + device_ids = [device.get_id() for device in devices] + assert set(device_ids) == {"d1", "d3"} + + def test_get_all_by_nonexistent_type(self, device_actor_wrapper_references): + devices = device_actor_wrapper_references.get_all_by_type("nonexistent") + assert len(devices) == 0 + + def test_device_uniqueness(self, device_actor_wrapper_references): + lab1_devices = device_actor_wrapper_references.get_all_by_lab_id("lab1") + lab2_devices = device_actor_wrapper_references.get_all_by_lab_id("lab2") + all_devices = lab1_devices + lab2_devices + unique_devices = {device.actor for device in all_devices} + assert len(all_devices) == len(unique_devices) diff --git a/tests/test_experiment_executor.py b/tests/test_experiment_executor.py index 0e12f44..dd75983 100644 --- a/tests/test_experiment_executor.py +++ b/tests/test_experiment_executor.py @@ -1,6 +1,8 @@ import asyncio +from unittest.mock import patch from eos.experiments.entities.experiment import ExperimentStatus +from eos.experiments.exceptions import EosExperimentExecutionError from eos.tasks.entities.task import TaskStatus from tests.fixtures import * @@ -39,6 +41,7 @@ def test_start_experiment(self, experiment_executor, experiment_manager): assert experiment.id == EXPERIMENT_ID assert experiment.status == ExperimentStatus.RUNNING + @pytest.mark.slow @pytest.mark.asyncio async def test_progress_experiment(self, experiment_executor, experiment_manager, task_manager): experiment_executor.start_experiment(DYNAMIC_PARAMETERS) @@ -79,9 +82,7 @@ async def test_task_output_registration(self, experiment_executor, task_manager) assert mixing_output.parameters["mixing_time"] == DYNAMIC_PARAMETERS["mixing"]["time"] @pytest.mark.asyncio - async def test_resolve_input_parameter_references_and_dynamic_parameters( - self, experiment_executor, task_manager - ): + async def test_resolve_input_parameter_references_and_dynamic_parameters(self, experiment_executor, task_manager): experiment_executor.start_experiment(DYNAMIC_PARAMETERS) experiment_completed = False @@ -97,7 +98,43 @@ async def test_resolve_input_parameter_references_and_dynamic_parameters( assert mixing_task.input.parameters["time"] == DYNAMIC_PARAMETERS["mixing"]["time"] # Check that the output parameter mixing time was assigned to the input parameter evaporation time - assert ( - evaporation_task.input.parameters["evaporation_time"] - == mixing_result.parameters["mixing_time"] + assert evaporation_task.input.parameters["evaporation_time"] == mixing_result.parameters["mixing_time"] + + @pytest.mark.parametrize( + "experiment_status", + [ + ExperimentStatus.COMPLETED, + ExperimentStatus.SUSPENDED, + ExperimentStatus.CANCELLED, + ExperimentStatus.FAILED, + ExperimentStatus.RUNNING, + ], + ) + def test_handle_existing_experiment(self, experiment_executor, experiment_manager, experiment_status): + experiment_manager.create_experiment( + EXPERIMENT_ID, EXPERIMENT_TYPE, experiment_executor._execution_parameters, {}, {} ) + experiment_manager._set_experiment_status(EXPERIMENT_ID, experiment_status) + + experiment_executor._execution_parameters.resume = False + with patch.object(experiment_executor, "_resume_experiment") as mock_resume: + if experiment_status in [ + ExperimentStatus.COMPLETED, + ExperimentStatus.SUSPENDED, + ExperimentStatus.CANCELLED, + ExperimentStatus.FAILED, + ]: + with pytest.raises(EosExperimentExecutionError) as exc_info: + experiment_executor._handle_existing_experiment(experiment_manager.get_experiment(EXPERIMENT_ID)) + assert experiment_status.name.lower() in str(exc_info.value) + mock_resume.assert_not_called() + else: + experiment_executor._handle_existing_experiment(experiment_manager.get_experiment(EXPERIMENT_ID)) + mock_resume.assert_not_called() + + experiment_executor._execution_parameters.resume = True + with patch.object(experiment_executor, "_resume_experiment") as mock_resume: + experiment_executor._handle_existing_experiment(experiment_manager.get_experiment(EXPERIMENT_ID)) + mock_resume.assert_called_once() + + assert experiment_executor._experiment_status == experiment_status diff --git a/tests/test_ray_actor_wrapper.py b/tests/test_ray_actor_wrapper.py new file mode 100644 index 0000000..4cbd982 --- /dev/null +++ b/tests/test_ray_actor_wrapper.py @@ -0,0 +1,65 @@ +import pytest +import ray + +from eos.utils.ray_utils import RayActorWrapper + + +@ray.remote +class DummyActor: + def __init__(self): + self.value = 0 + + def reset_value(self): + self.value = 0 + + def increment(self, amount=1): + self.value += amount + return self.value + + def get_value(self): + return self.value + + +class TestRayActorWrapper: + @pytest.fixture(scope="class") + def dummy_actor(self): + actor = DummyActor.remote() + yield actor + ray.kill(actor) + + @pytest.fixture(scope="class") + def wrapped_actor(self, dummy_actor): + return RayActorWrapper(dummy_actor) + + def test_method_call(self, wrapped_actor): + wrapped_actor.reset_value() + result = wrapped_actor.increment() + assert result == 1 + + def test_method_call_with_args(self, wrapped_actor): + wrapped_actor.reset_value() + result = wrapped_actor.increment(5) + assert result == 5 + + def test_multiple_method_calls(self, wrapped_actor): + wrapped_actor.reset_value() + wrapped_actor.increment() + wrapped_actor.increment(4) + result = wrapped_actor.get_value() + assert result == 5 + + def test_nonexistent_method(self, wrapped_actor): + with pytest.raises(AttributeError): + wrapped_actor.nonexistent_method() + + def test_wrapped_actor_no_independence(self, dummy_actor): + wrapped_actor1 = RayActorWrapper(dummy_actor) + wrapped_actor2 = RayActorWrapper(dummy_actor) + + wrapped_actor1.reset_value() + wrapped_actor1.increment(2) + result = wrapped_actor2.get_value() + assert result == 2 + + def test_actor_property(self, wrapped_actor, dummy_actor): + assert wrapped_actor.actor == dummy_actor