From 6b86808a040c88ad82a4101e921d9c31e26ed42b Mon Sep 17 00:00:00 2001 From: dengdifan Date: Wed, 23 Oct 2024 15:50:32 +0200 Subject: [PATCH] add options for batch sampling --- smac/main/config_selector.py | 69 ++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/smac/main/config_selector.py b/smac/main/config_selector.py index 42a72f02b..9ec2d25f4 100644 --- a/smac/main/config_selector.py +++ b/smac/main/config_selector.py @@ -16,6 +16,7 @@ from smac.callback.callback import Callback from smac.initial_design import AbstractInitialDesign from smac.model.abstract_model import AbstractModel +from smac.model.gaussian_process import GaussianProcess from smac.random_design.abstract_random_design import AbstractRandomDesign from smac.runhistory.encoder.abstract_encoder import AbstractRunHistoryEncoder from smac.runhistory.runhistory import RunHistory @@ -53,6 +54,7 @@ def __init__( retrain_after: int = 8, retries: int = 16, min_trials: int = 1, + batch_sampling_estimating_strategy: str = 'CL_mean', ) -> None: # Those are the configs sampled from the passed initial design # Selecting configurations from initial design @@ -82,6 +84,9 @@ def __init__( # Processed configurations should be stored here; this is important to not return the same configuration twice self._processed_configs: list[Configuration] = [] + # for batch sampling setting + self._batch_sampling_estimating_strategy = batch_sampling_estimating_strategy + def _set_components( self, initial_design: AbstractInitialDesign, @@ -284,6 +289,20 @@ def _collect_data(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]: # Possible add running configs? configs_array = self._runhistory_encoder.get_configurations(budget_subset=self._considered_budgets) + # add running configurations + # If our batch size is 1, then no running configuration should exist, we could then skip this part. + # Therefore, there is no need to check the number of workers in this case + + X_running = self._runhistory_encoder.transform_running_configs(budget_subset=[b]) + Y_estimated = self.estimate_running_config_costs(X_running, Y, self._batch_sampling_estimating_strategy) + if Y_estimated is not None: + configs_array_running = self._runhistory_encoder.get_running_configurations( + budget_subset=self._considered_budgets + ) + X = np.concatenate([X, X_running], dim=0) + Y = np.concatenate([Y, Y_estimated], dim=0) + configs_array = np.concatenate([configs_array, configs_array_running], dim=0) + return X, Y, configs_array return ( @@ -300,6 +319,56 @@ def _get_evaluated_configs(self) -> list[Configuration]: assert self._runhistory is not None return self._runhistory.get_configs_per_budget(budget_subset=self._considered_budgets) + def estimate_running_config_costs( + self, + X_running: np.ndarray, + Y_evaluated: np.ndarray, + estimate_strategy: str = 'CL_max'): + """ + This function is implemented to estimate the still pending/ running configurations + Parameters + ---------- + X_running : np.ndarray + a np array with size (n_running_configs, D) that represents the array values of the running configurations + Y_evaluated : np.ndarray + a np array with size (n_evaluated_configs, n_obj) that records the costs of all the previous evaluated + configurations + estimate_strategy: str + how do we estimate the target y_running values + + Returns + ------- + Y_running_estimated : np.ndarray + the estimated running y values + """ + n_running_points = len(X_running) + if n_running_points == 0: + return None + if estimate_strategy == 'CL_max': + # constant liar max, we take the maximal values of all the evaluated Y and apply them to the running X + Y_estimated = np.max(Y_evaluated, dim=0, keepdims=True) + return np.repeat(Y_estimated, n_running_points, 0) + elif estimate_strategy == 'CL_min': + # constant liar min, we take the minimal values of all the evaluated Y and apply them to the running X + Y_estimated = np.max(Y_evaluated, dim=0, keepdims=True) + return np.repeat(Y_estimated, n_running_points, 0) + elif estimate_strategy == 'CL_mean': + # constant liar min, we take the minimal values of all the evaluated Y and apply them to the running X + Y_estimated = np.mean(Y_evaluated, dim=0, keepdims=True) + return np.repeat(Y_estimated, n_running_points, 0) + elif estimate_strategy == 'kriging_believer': + # in kriging believer, we apply the predicted means of the surrogate model to estimate the running X + return self._model.predict_marginalized(X_running)[0] + elif estimate_strategy == 'sample': + # https://papers.nips.cc/paper_files/paper/2012/file/05311655a15b75fab86956663e1819cd-Paper.pdf + # since this requires a multi-variant gaussian distribution, we need to restrict the model needs to be a + # gaussian process + assert isinstance(self._model, GaussianProcess), 'Sample based estimate strategy only allows ' \ + 'GP as surrogate model!' + return self._model.sample_functions(X_test=X_running, n_funcs=1) + else: + raise ValueError(f'Unknown estimating strategy: {estimate_strategy}') + def _get_x_best(self, X: np.ndarray) -> tuple[np.ndarray, float]: """Get value, configuration, and array representation of the *best* configuration.