diff --git a/amlb/results.py b/amlb/results.py index b3991fac5..1bc3666b9 100644 --- a/amlb/results.py +++ b/amlb/results.py @@ -2,9 +2,9 @@ **results** module provides the logic to format, save and read predictions generated by the *automl frameworks* (cf. ``TaskResult``), as well as logic to compute, format, save, read and merge scores obtained from those predictions (cf. ``Result`` and ``Scoreboard``). """ + from __future__ import annotations -from functools import partial import collections import io import logging @@ -12,7 +12,7 @@ import os import re import statistics -from typing import Union, cast +from typing import Union, Any import numpy as np from numpy import nan, sort @@ -21,11 +21,35 @@ import scipy.sparse from .data import Dataset, DatasetType, Feature -from .datautils import accuracy_score, auc, average_precision_score, balanced_accuracy_score, confusion_matrix, fbeta_score, log_loss, \ - mean_absolute_error, mean_squared_error, mean_squared_log_error, precision_recall_curve, r2_score, roc_auc_score, \ - read_csv, write_csv, is_data_frame, to_data_frame +from .datautils import ( + accuracy_score, + average_precision_score, + balanced_accuracy_score, + confusion_matrix, + fbeta_score, + log_loss, + mean_absolute_error, + mean_squared_error, + mean_squared_log_error, + r2_score, + roc_auc_score, + read_csv, + write_csv, + is_data_frame, + to_data_frame, +) from .resources import get as rget, config as rconfig, output_dirs -from .utils import Namespace, backup_file, cached, datetime_iso, get_metadata, json_load, memoize, profile, set_metadata +from .utils import ( + Namespace, + backup_file, + cached, + datetime_iso, + get_metadata, + json_load, + memoize, + profile, + set_metadata, +) log = logging.getLogger(__name__) @@ -42,6 +66,7 @@ def decorator(fn): set_metadata(fn, higher_is_better=higher_is_better) _supported_metrics_[fn.__name__] = fn return fn + return decorator @@ -54,8 +79,7 @@ class ResultError(Exception): class Scoreboard: - - results_file = 'results.csv' + results_file = "results.csv" @classmethod def all(cls, scores_dir=None, autoload=True): @@ -82,16 +106,21 @@ def from_file(cls, path): if m: found = True d = m.groupdict() - benchmark_name = 'benchmark' in d and d['benchmark'] - task_name = 'task' in d and d['task'] - framework_name = 'framework' in d and d['framework'] + benchmark_name = "benchmark" in d and d["benchmark"] + task_name = "task" in d and d["task"] + framework_name = "framework" in d and d["framework"] break if not found: return None scores_dir = None if path == basename else folder - return cls(framework_name=framework_name, benchmark_name=benchmark_name, task_name=task_name, scores_dir=scores_dir) + return cls( + framework_name=framework_name, + benchmark_name=benchmark_name, + task_name=task_name, + scores_dir=scores_dir, + ) @staticmethod # @profile(logger=log) @@ -117,80 +146,158 @@ def save_df(data_frame, path, append=False): if new_format and append: df = read_csv(path).append(data_frame) new_file = not exists or not append or new_format - is_default_index = data_frame.index.name is None and not any(data_frame.index.names) + is_default_index = data_frame.index.name is None and not any( + data_frame.index.names + ) log.debug("Saving scores to `%s`.", path) - write_csv(df, - path=path, - header=new_file, - index=not is_default_index, - append=not new_file) + write_csv( + df, + path=path, + header=new_file, + index=not is_default_index, + append=not new_file, + ) log.info("Scores saved to `%s`.", path) - def __init__(self, scores=None, framework_name=None, benchmark_name=None, task_name=None, - scores_dir=None, autoload=True): + def __init__( + self, + scores=None, + framework_name=None, + benchmark_name=None, + task_name=None, + scores_dir=None, + autoload=True, + ): self.framework_name = framework_name self.benchmark_name = benchmark_name self.task_name = task_name - self.scores_dir = (scores_dir if scores_dir - else output_dirs(rconfig().output_dir, rconfig().sid, ['scores']).scores) - self.scores = (scores if scores is not None - else self.load_df(self.path) if autoload - else None) + self.scores_dir = ( + scores_dir + if scores_dir + else output_dirs(rconfig().output_dir, rconfig().sid, ["scores"]).scores + ) + self.scores = ( + scores + if scores is not None + else self.load_df(self.path) + if autoload + else None + ) @cached def as_data_frame(self): # index = ['task', 'framework', 'fold'] index = [] - df = (self.scores if is_data_frame(self.scores) - else to_data_frame([dict(sc) for sc in self.scores])) + df = ( + self.scores + if is_data_frame(self.scores) + else to_data_frame([dict(sc) for sc in self.scores]) + ) if df.empty: # avoid dtype conversions during reindexing on empty frame return df - fixed_cols = ['id', 'task', 'framework', 'constraint', 'fold', 'type', 'result', 'metric', 'mode', 'version', - 'params', 'app_version', 'utc', 'duration', 'training_duration', 'predict_duration', 'models_count', 'seed', 'info'] + fixed_cols = [ + "id", + "task", + "framework", + "constraint", + "fold", + "type", + "result", + "metric", + "mode", + "version", + "params", + "app_version", + "utc", + "duration", + "training_duration", + "predict_duration", + "models_count", + "seed", + "info", + ] fixed_cols = [col for col in fixed_cols if col not in index] - metrics_cols = [col for col in df.columns - if (col in dir(ClassificationResult) or col in dir(RegressionResult)) - and not col.startswith('_')] + metrics_cols = [ + col + for col in df.columns + if (col in dir(ClassificationResult) or col in dir(RegressionResult)) + and not col.startswith("_") + ] metrics_cols.sort() - dynamic_cols = [col for col in df.columns - if col not in index - and col not in fixed_cols - and col not in metrics_cols] + dynamic_cols = [ + col + for col in df.columns + if col not in index and col not in fixed_cols and col not in metrics_cols + ] dynamic_cols.sort() - df = df.reindex(columns=[]+fixed_cols+metrics_cols+dynamic_cols) + df = df.reindex(columns=[] + fixed_cols + metrics_cols + dynamic_cols) log.debug("Scores columns: %s.", df.columns) return df @memoize def as_printable_data_frame(self, verbosity=3): - str_print = lambda val: '' if val in [None, '', 'None'] or (isinstance(val, float) and np.isnan(val)) else str(val) - int_print = lambda val: int(val) if isinstance(val, (float, int)) and not np.isnan(val) else str_print(val) + def none_like_as_empty(val: Any) -> str: + return ( + "" + if val in [None, "", "None"] + or (isinstance(val, float) and np.isnan(val)) + else str(val) + ) + + def as_int_or_str(val: Any) -> str | int: + return ( + int(val) + if isinstance(val, (float, int)) and not np.isnan(val) + else none_like_as_empty(val) + ) df = self.as_data_frame() if df.empty: return df - force_str_cols = ['id'] - nanable_int_cols = ['fold', 'models_count', 'seed'] - low_precision_float_cols = ['duration', 'training_duration', 'predict_duration'] - high_precision_float_cols = [col for col in df.select_dtypes(include=[float]).columns if col not in ([] + nanable_int_cols + low_precision_float_cols)] + force_str_cols = ["id"] + nanable_int_cols = ["fold", "models_count", "seed"] + low_precision_float_cols = ["duration", "training_duration", "predict_duration"] + high_precision_float_cols = [ + col + for col in df.select_dtypes(include=[float]).columns + if col not in ([] + nanable_int_cols + low_precision_float_cols) + ] for col in force_str_cols: - df[col] = df[col].map(str_print) + df[col] = df[col].map(none_like_as_empty) for col in nanable_int_cols: - df[col] = df[col].map(int_print) + df[col] = df[col].map(as_int_or_str) for col in low_precision_float_cols: - float_format = lambda f: ("{:.1g}" if f < 1 else "{:.1f}").format(f) + + def format_low_precision_float(val: float) -> str: + return ("{:.1g}" if val < 1 else "{:.1f}").format(val) + # The .astype(float) is required to maintain NaN as 'NaN' instead of 'nan' - df[col] = df[col].map(float_format).astype(float) + df[col] = df[col].map(format_low_precision_float).astype(float) for col in high_precision_float_cols: df[col] = df[col].map("{:.6g}".format).astype(float) - cols = ([] if verbosity == 0 - else ['task', 'fold', 'framework', 'constraint', 'result', 'metric', 'info'] if verbosity == 1 - else ['id', 'task', 'fold', 'framework', 'constraint', 'result', 'metric', - 'duration', 'seed', 'info'] if verbosity == 2 - else slice(None)) + cols = ( + [] + if verbosity == 0 + else ["task", "fold", "framework", "constraint", "result", "metric", "info"] + if verbosity == 1 + else [ + "id", + "task", + "fold", + "framework", + "constraint", + "result", + "metric", + "duration", + "seed", + "info", + ] + if verbosity == 2 + else slice(None) + ) return df.loc[:, cols] def load(self): @@ -202,15 +309,21 @@ def save(self, append=False): return self def append(self, board_or_df, no_duplicates=True): - to_append = board_or_df.as_data_frame() if isinstance(board_or_df, Scoreboard) else board_or_df + to_append = ( + board_or_df.as_data_frame() + if isinstance(board_or_df, Scoreboard) + else board_or_df + ) scores = self.as_data_frame().append(to_append) if no_duplicates: scores = scores.drop_duplicates() - return Scoreboard(scores=scores, - framework_name=self.framework_name, - benchmark_name=self.benchmark_name, - task_name=self.task_name, - scores_dir=self.scores_dir) + return Scoreboard( + scores=scores, + framework_name=self.framework_name, + benchmark_name=self.benchmark_name, + task_name=self.task_name, + scores_dir=self.scores_dir, + ) @property def path(self): @@ -219,7 +332,9 @@ def path(self): if self.task_name: file_name = f"{self.framework_name}{sep}task_{self.task_name}.csv" elif self.benchmark_name: - file_name = f"{self.framework_name}{sep}benchmark_{self.benchmark_name}.csv" + file_name = ( + f"{self.framework_name}{sep}benchmark_{self.benchmark_name}.csv" + ) else: file_name = f"{self.framework_name}.csv" else: @@ -234,7 +349,6 @@ def path(self): class TaskResult: - @staticmethod # @profile(logger=log) def load_predictions(predictions_file): @@ -247,7 +361,7 @@ def load_predictions(predictions_file): if rconfig().test_mode: TaskResult.validate_predictions(df) - if 'repeated_item_id' in df.columns: + if "repeated_item_id" in df.columns: return TimeSeriesResult(df) else: if df.shape[1] > 2: @@ -257,7 +371,10 @@ def load_predictions(predictions_file): except Exception as e: return ErrorResult(ResultError(e)) else: - log.warning("Predictions file `%s` is missing: framework either failed or could not produce any prediction.", predictions_file) + log.warning( + "Predictions file `%s` is missing: framework either failed or could not produce any prediction.", + predictions_file, + ) return NoResult("Missing predictions.") @staticmethod @@ -266,18 +383,26 @@ def load_metadata(metadata_file): if os.path.isfile(metadata_file): return json_load(metadata_file, as_namespace=True) else: - log.warning("Metadata file `%s` is missing: framework either couldn't start or implementation doesn't save metadata.", metadata_file) + log.warning( + "Metadata file `%s` is missing: framework either couldn't start or implementation doesn't save metadata.", + metadata_file, + ) return Namespace(lambda: None) @staticmethod # @profile(logger=log) - def save_predictions(dataset: Dataset, output_file: str, - predictions: Union[A, DF, S] = None, truth: Union[A, DF, S] = None, - probabilities: Union[A, DF] = None, probabilities_labels: Union[list, A] = None, - optional_columns: Union[A, DF] = None, - target_is_encoded: bool = False, - preview: bool = True): - """ Save class probabilities and predicted labels to file in csv format. + def save_predictions( + dataset: Dataset, + output_file: str, + predictions: Union[A, DF, S] = None, + truth: Union[A, DF, S] = None, + probabilities: Union[A, DF] = None, + probabilities_labels: Union[list, A] = None, + optional_columns: Union[A, DF] = None, + target_is_encoded: bool = False, + preview: bool = True, + ): + """Save class probabilities and predicted labels to file in csv format. :param dataset: :param output_file: @@ -296,8 +421,12 @@ def save_predictions(dataset: Dataset, output_file: str, predictions = predictions.squeeze() if isinstance(predictions, S): predictions = predictions.values - if scipy.sparse.issparse(truth) and truth.shape[1] == 1: - truth = pd.DataFrame(cast(scipy.sparse.sparray, truth).todense()) + if ( + scipy.sparse.issparse(truth) + and isinstance(truth, scipy.sparse.sparray) + and truth.shape[1] == 1 + ): + truth = pd.DataFrame(truth.todense()) if isinstance(truth, DF): truth = truth.squeeze() if isinstance(truth, S): @@ -308,12 +437,21 @@ def save_predictions(dataset: Dataset, output_file: str, probabilities_labels = [str(label) for label in probabilities_labels] if probabilities is not None: - prob_cols = probabilities_labels if probabilities_labels else dataset.target.label_encoder.classes + prob_cols = ( + probabilities_labels + if probabilities_labels + else dataset.target.label_encoder.classes + ) df = to_data_frame(probabilities, column_names=prob_cols) if probabilities_labels is not None: - df = df[sort(prob_cols)] # reorder columns alphabetically: necessary to match label encoding + df = df[ + sort(prob_cols) + ] # reorder columns alphabetically: necessary to match label encoding if any(prob_cols != df.columns.values): - encoding_map = {prob_cols.index(col): i for i, col in enumerate(df.columns.values)} + encoding_map = { + prob_cols.index(col): i + for i, col in enumerate(df.columns.values) + } remap = np.vectorize(lambda v: encoding_map[v]) else: df = to_data_frame(None) @@ -345,37 +483,71 @@ def save_predictions(dataset: Dataset, output_file: str, @staticmethod def validate_predictions(predictions: pd.DataFrame): names = predictions.columns.values - assert len(names) >= 2, "predictions frame should have 2 columns (regression) or more (classification)" - assert names[-1] == "truth", "last column of predictions frame must be named `truth`" - assert names[-2] == "predictions", "last column of predictions frame must be named `predictions`" + assert ( + len(names) >= 2 + ), "predictions frame should have 2 columns (regression) or more (classification)" + assert ( + names[-1] == "truth" + ), "last column of predictions frame must be named `truth`" + assert ( + names[-2] == "predictions" + ), "last column of predictions frame must be named `predictions`" if len(names) == 2: # regression for name, col in predictions.items(): pd.to_numeric(col) # pandas will raise if we have non-numerical values else: # classification predictors = names[:-2] - probabilities, preds, truth = predictions.iloc[:,:-2], predictions.iloc[:,-2], predictions.iloc[:,-1] - assert np.array_equal(predictors, np.sort(predictors)), "Predictors columns are not sorted in lexicographic order." - assert set(np.unique(predictors)) == set(predictors), "Predictions contain multiple columns with the same label." + probabilities, preds, truth = ( + predictions.iloc[:, :-2], + predictions.iloc[:, -2], + predictions.iloc[:, -1], + ) + assert np.array_equal( + predictors, np.sort(predictors) + ), "Predictors columns are not sorted in lexicographic order." + assert set(np.unique(predictors)) == set( + predictors + ), "Predictions contain multiple columns with the same label." for name, col in probabilities.items(): pd.to_numeric(col) # pandas will raise if we have non-numerical values if _encode_predictions_and_truth_: - assert np.array_equal(truth, truth.astype(int)), "Values in truth column are not encoded." - assert np.array_equal(preds, preds.astype(int)), "Values in predictions column are not encoded." + assert np.array_equal( + truth, truth.astype(int) + ), "Values in truth column are not encoded." + assert np.array_equal( + preds, preds.astype(int) + ), "Values in predictions column are not encoded." predictors_set = set(range(len(predictors))) - validate_row = lambda r: r[:-2].astype(float).values.argmax() == r[-2] + + def validate_row(row) -> bool: + breakpoint() + return row[:-2].astype(float).values.argmax() == row[-2] else: predictors_set = set(predictors) - validate_row = lambda r: r[:-2].astype(float).idxmax() == r[-2] + + def validate_row(row) -> bool: + breakpoint() + return row[:-2].astype(float).idxmax() == row[-2] truth_set = set(truth.unique()) if predictors_set < truth_set: - log.warning("Truth column contains values unseen during training: no matching probability column.") + log.warning( + "Truth column contains values unseen during training: no matching probability column." + ) if predictors_set > truth_set: - log.warning("Truth column doesn't contain all the possible target values: the test dataset may be too small.") + log.warning( + "Truth column doesn't contain all the possible target values: the test dataset may be too small." + ) predictions_set = set(preds.unique()) - assert predictions_set <= predictors_set, "Predictions column contains unexpected values: {}.".format(predictions_set - predictors_set) - assert predictions.apply(validate_row, axis=1).all(), "Predictions don't always match the predictor with the highest probability." + assert ( + predictions_set <= predictors_set + ), "Predictions column contains unexpected values: {}.".format( + predictions_set - predictors_set + ) + assert predictions.apply( + validate_row, axis=1 + ).all(), "Predictions don't always match the predictor with the highest probability." @classmethod def score_from_predictions_file(cls, path): @@ -388,34 +560,51 @@ def score_from_predictions_file(cls, path): if folder_m: folder_g = folder_m.groupdict() - file_pat = rf"(?P[\w\-]+?){sep}(?P[\w\-]+){sep}(?P\d+)\.csv" + file_pat = ( + rf"(?P[\w\-]+?){sep}(?P[\w\-]+){sep}(?P\d+)\.csv" + ) file_m = re.fullmatch(file_pat, basename) if not file_m: log.error("Predictions file `%s` has wrong naming format.", path) return None file_g = file_m.groupdict() - task_name = file_g['task'] - fold = int(file_g['fold']) - constraint = folder_g['constraint'] - benchmark = folder_g['benchmark'] + task_name = file_g["task"] + fold = int(file_g["fold"]) + constraint = folder_g["constraint"] + benchmark = folder_g["benchmark"] task = Namespace(name=task_name, id=task_name) if benchmark: try: tasks, _, _ = rget().benchmark_definition(benchmark) task = next(t for t in tasks if t.name == task_name) - except: - pass + except Exception as e: + log.warning( + "Failed to fetch task from benchmark definition to load predictions: %s", + e, + ) task_result = cls(task, fold, constraint, predictions_dir=path) return task_result.compute_score() - def __init__(self, task_def, fold: int, constraint: str, predictions_dir: str | None = None, metadata: Namespace = None): + def __init__( + self, + task_def, + fold: int, + constraint: str, + predictions_dir: str | None = None, + metadata: Namespace = None, + ): self.task = task_def self.fold = fold self.constraint = constraint - self.predictions_dir = (predictions_dir if predictions_dir - else output_dirs(rconfig().output_dir, rconfig().sid, ['predictions']).predictions) + self.predictions_dir = ( + predictions_dir + if predictions_dir + else output_dirs( + rconfig().output_dir, rconfig().sid, ["predictions"] + ).predictions + ) self._metadata = metadata @cached @@ -437,39 +626,43 @@ def compute_score(self, result=None, meta_result=None): constraint=self.constraint, framework=metadata.framework, version=metadata.framework_version, - params=repr(metadata.framework_params) if metadata.framework_params else '', + params=repr(metadata.framework_params) if metadata.framework_params else "", fold=self.fold, mode=rconfig().run_mode, seed=metadata.seed, app_version=rget().app_version, utc=datetime_iso(), metric=metadata.metric, - duration=nan + duration=nan, ) - required_meta_res = ['training_duration', 'predict_duration', 'models_count'] + required_meta_res = ["training_duration", "predict_duration", "models_count"] for m in required_meta_res: entry[m] = meta_result[m] if m in meta_result else nan if inference_times := Namespace.get(meta_result, "inference_times"): for data_type, measurements in Namespace.dict(inference_times).items(): for n_samples, measured_times in Namespace.dict(measurements).items(): - entry[f"infer_batch_size_{data_type}_{n_samples}"] = np.median(measured_times) + entry[f"infer_batch_size_{data_type}_{n_samples}"] = np.median( + measured_times + ) result = self.get_result() if result is None else result scoring_errors = [] def do_score(m): score = result.evaluate(m) - if 'message' in score: + if "message" in score: scoring_errors.append(score.message) return score def set_score(score): entry.metric = score.metric entry.result = score.value - if score.higher_is_better is False: # if unknown metric, and higher_is_better is None, then no change + if ( + score.higher_is_better is False + ): # if unknown metric, and higher_is_better is None, then no change entry.metric = f"neg_{entry.metric}" - entry.result = - entry.result + entry.result = -entry.result for metric in metadata.metrics or []: sc = do_score(metric) @@ -477,27 +670,36 @@ def set_score(score): if metric == entry.metric: set_score(sc) - if 'result' not in entry: + if "result" not in entry: set_score(do_score(entry.metric)) entry.info = result.info if scoring_errors: entry.info = "; ".join(filter(lambda it: it, [entry.info, *scoring_errors])) - entry |= Namespace({k: v for k, v in meta_result if k not in required_meta_res and k != "inference_times"}) + entry |= Namespace( + { + k: v + for k, v in meta_result + if k not in required_meta_res and k != "inference_times" + } + ) log.info("Metric scores: %s", entry) return entry @property def _predictions_file(self): - return os.path.join(self.predictions_dir, self.task.name, str(self.fold), "predictions.csv") + return os.path.join( + self.predictions_dir, self.task.name, str(self.fold), "predictions.csv" + ) @property def _metadata_file(self): - return os.path.join(self.predictions_dir, self.task.name, str(self.fold), "metadata.json") + return os.path.join( + self.predictions_dir, self.task.name, str(self.fold), "metadata.json" + ) class Result: - def __init__(self, predictions_df, info=None): self.df = predictions_df self.info = info @@ -510,22 +712,25 @@ def evaluate(self, metric): eval_res = Namespace(metric=metric) if hasattr(self, metric): metric_fn = getattr(self, metric) - eval_res.higher_is_better = get_metadata(metric_fn, 'higher_is_better') + eval_res.higher_is_better = get_metadata(metric_fn, "higher_is_better") try: eval_res.value = metric_fn() except Exception as e: log.exception("Failed to compute metric %s: ", metric, e) eval_res += Namespace(value=nan, message=f"Scoring {metric}: {str(e)}") else: - pb_type = self.type.name if self.type is not None else 'unknown' + pb_type = self.type.name if self.type is not None else "unknown" # raise ValueError(f"Metric {metric} is not supported for {pb_type}.") log.warning("Metric %s is not supported for %s problems!", metric, pb_type) - eval_res += Namespace(value=nan, higher_is_better=None, message=f"Unsupported metric `{metric}` for {pb_type} problems") + eval_res += Namespace( + value=nan, + higher_is_better=None, + message=f"Unsupported metric `{metric}` for {pb_type} problems", + ) return eval_res class NoResult(Result): - def __init__(self, info=None): super().__init__(None, info) self.missing_result = np.nan @@ -535,31 +740,39 @@ def evaluate(self, metric): if metric is None: eval_res += Namespace(higher_is_better=None) elif metric not in _supported_metrics_: - eval_res += Namespace(higher_is_better=None, message=f"Unsupported metric `{metric}`") + eval_res += Namespace( + higher_is_better=None, message=f"Unsupported metric `{metric}`" + ) else: - eval_res.higher_is_better = get_metadata(_supported_metrics_.get(metric), 'higher_is_better') + eval_res.higher_is_better = get_metadata( + _supported_metrics_.get(metric), "higher_is_better" + ) return eval_res class ErrorResult(NoResult): - def __init__(self, error): - msg = "{}: {}".format(type(error).__qualname__ if error is not None else "Error", error) + msg = "{}: {}".format( + type(error).__qualname__ if error is not None else "Error", error + ) max_len = rconfig().results.error_max_length - msg = msg if len(msg) <= max_len else (msg[:max_len - 1] + '…') + msg = msg if len(msg) <= max_len else (msg[: max_len - 1] + "…") super().__init__(msg) class ClassificationResult(Result): - - multi_class_average = 'weighted' # used by metrics like fbeta or auc + multi_class_average = "weighted" # used by metrics like fbeta or auc def __init__(self, predictions_df, info=None): super().__init__(predictions_df, info) self.classes = self.df.columns[:-2].values.astype(str, copy=False) self.probabilities = self.df.iloc[:, :-2].values.astype(float, copy=False) - self.target = Feature(0, 'target', 'category', values=self.classes, is_target=True) - self.type = DatasetType.binary if len(self.classes) == 2 else DatasetType.multiclass + self.target = Feature( + 0, "target", "category", values=self.classes, is_target=True + ) + self.type = ( + DatasetType.binary if len(self.classes) == 2 else DatasetType.multiclass + ) self.truth = self._autoencode(self.truth.astype(str, copy=False)) self.predictions = self._autoencode(self.predictions.astype(str, copy=False)) self.labels = self._autoencode(self.classes) @@ -573,19 +786,21 @@ def acc(self): def auc(self): """Area Under (ROC) Curve, computed on probabilities, not on predictions""" if self.type == DatasetType.multiclass: - raise ResultError("For multiclass problems, use `auc_ovr` or `auc_ovo` metrics instead of `auc`.") + raise ResultError( + "For multiclass problems, use `auc_ovr` or `auc_ovo` metrics instead of `auc`." + ) else: return float(roc_auc_score(self.truth, self.probabilities[:, 1])) @metric(higher_is_better=True) def auc_ovo(self): """AUC One-vs-One""" - return self._auc_multi(mc='ovo') + return self._auc_multi(mc="ovo") @metric(higher_is_better=True) def auc_ovr(self): """AUC One-vs-Rest""" - return self._auc_multi(mc='ovr') + return self._auc_multi(mc="ovr") @metric(higher_is_better=True) def balacc(self): @@ -633,30 +848,53 @@ def pr_auc(self): return float(average_precision_score(self.truth, self.probabilities[:, 1])) def _autoencode(self, vec): - needs_encoding = not _encode_predictions_and_truth_ or (isinstance(vec[0], str) and not vec[0].isdigit()) + needs_encoding = not _encode_predictions_and_truth_ or ( + isinstance(vec[0], str) and not vec[0].isdigit() + ) return self.target.label_encoder.transform(vec) if needs_encoding else vec - def _auc_multi(self, mc='raise'): + def _auc_multi(self, mc="raise"): average = ClassificationResult.multi_class_average - return float(roc_auc_score(self.truth, self.probabilities, average=average, labels=self.labels, multi_class=mc)) + return float( + roc_auc_score( + self.truth, + self.probabilities, + average=average, + labels=self.labels, + multi_class=mc, + ) + ) def _cm(self): return confusion_matrix(self.truth, self.predictions, labels=self.labels) def _fbeta(self, beta): - average = ClassificationResult.multi_class_average if self.type == DatasetType.multiclass else 'binary' - return float(fbeta_score(self.truth, self.predictions, beta=beta, average=average, labels=self.labels)) + average = ( + ClassificationResult.multi_class_average + if self.type == DatasetType.multiclass + else "binary" + ) + return float( + fbeta_score( + self.truth, + self.predictions, + beta=beta, + average=average, + labels=self.labels, + ) + ) def _per_class_errors(self): - return [(s-d)/s for s, d in ((sum(r), r[i]) for i, r in enumerate(self._cm()))] + return [ + (s - d) / s for s, d in ((sum(r), r[i]) for i, r in enumerate(self._cm())) + ] class RegressionResult(Result): - def __init__(self, predictions_df, info=None): super().__init__(predictions_df, info) self.truth = self.truth.astype(float, copy=False) - self.target = Feature(0, 'target', 'real', is_target=True) + self.target = Feature(0, "target", "real", is_target=True) self.type = DatasetType.regression @metric(higher_is_better=False) @@ -693,30 +931,51 @@ def r2(self): class TimeSeriesResult(RegressionResult): def __init__(self, predictions_df, info=None): super().__init__(predictions_df, info) - required_columns = {'truth', 'predictions', 'repeated_item_id', 'repeated_abs_seasonal_error'} + required_columns = { + "truth", + "predictions", + "repeated_item_id", + "repeated_abs_seasonal_error", + } if required_columns - set(self.df.columns): - raise ValueError(f'Missing columns for calculating time series metrics: {required_columns - set(self.df.columns)}.') + raise ValueError( + f"Missing columns for calculating time series metrics: {required_columns - set(self.df.columns)}." + ) - quantile_columns = [column for column in self.df.columns if column.startswith('0.')] - unrecognized_columns = [column for column in self.df.columns if column not in required_columns and column not in quantile_columns] + quantile_columns = [ + column for column in self.df.columns if column.startswith("0.") + ] + unrecognized_columns = [ + column + for column in self.df.columns + if column not in required_columns and column not in quantile_columns + ] if len(unrecognized_columns) > 0: - raise ValueError(f'Predictions contain unrecognized columns: {unrecognized_columns}.') + raise ValueError( + f"Predictions contain unrecognized columns: {unrecognized_columns}." + ) self.type = DatasetType.timeseries - self.truth = self.df['truth'].values.astype(float) - self.item_ids = self.df['repeated_item_id'].values - self.abs_seasonal_error = self.df['repeated_abs_seasonal_error'].values.astype(float) + self.truth = self.df["truth"].values.astype(float) + self.item_ids = self.df["repeated_item_id"].values + self.abs_seasonal_error = self.df["repeated_abs_seasonal_error"].values.astype( + float + ) # predictions = point forecast, quantile_predictions = quantile forecast - self.predictions = self.df['predictions'].values.astype(float) + self.predictions = self.df["predictions"].values.astype(float) self.quantile_predictions = self.df[quantile_columns].values.astype(float) self.quantile_levels = np.array(quantile_columns, dtype=float) - if (~np.isfinite(self.predictions)).any() or (~np.isfinite(self.quantile_predictions)).any(): - raise ValueError('Predictions contain NaN or Inf values') + if (~np.isfinite(self.predictions)).any() or ( + ~np.isfinite(self.quantile_predictions) + ).any(): + raise ValueError("Predictions contain NaN or Inf values") _, unique_item_ids_counts = np.unique(self.item_ids, return_counts=True) if len(set(unique_item_ids_counts)) != 1: - raise ValueError(f'Error: Predicted sequences have different lengths {unique_item_ids_counts}.') + raise ValueError( + f"Error: Predicted sequences have different lengths {unique_item_ids_counts}." + ) def _itemwise_mean(self, values): """Compute mean for each time series.""" @@ -743,7 +1002,9 @@ def mape(self): @metric(higher_is_better=False) def wape(self): """Weighted Average Percentage Error""" - return np.sum(np.abs(self.truth - self.predictions)) / np.sum(np.abs(self.truth)) + return np.sum(np.abs(self.truth - self.predictions)) / np.sum( + np.abs(self.truth) + ) @metric(higher_is_better=False) def mase(self): @@ -760,7 +1021,10 @@ def _quantile_loss_per_step(self): # Array of shape [len(self.predictions), len(self.quantile_levels)] return 2 * np.abs( (self.quantile_predictions - self.truth[:, None]) - * ((self.quantile_predictions >= self.truth[:, None]) - self.quantile_levels) + * ( + (self.quantile_predictions >= self.truth[:, None]) + - self.quantile_levels + ) ) @metric(higher_is_better=False) @@ -779,7 +1043,9 @@ def wql(self): Defined as total quantile loss normalized by the total abs value of target time series. """ - return self._quantile_loss_per_step().mean(axis=1).sum() / np.sum(np.abs(self.truth)) + return self._quantile_loss_per_step().mean(axis=1).sum() / np.sum( + np.abs(self.truth) + ) @metric(higher_is_better=False) def sql(self): @@ -788,7 +1054,9 @@ def sql(self): Similar to MASE, the quantile loss for each item is normalized by the in-sample error of the naive forecaster. This makes scores comparable across different items. """ - pl_per_item = self._itemwise_mean(self._quantile_loss_per_step().mean(axis=1) / self.abs_seasonal_error) + pl_per_item = self._itemwise_mean( + self._quantile_loss_per_step().mean(axis=1) / self.abs_seasonal_error + ) return self._safemean(pl_per_item) diff --git a/amlb/runners/aws.py b/amlb/runners/aws.py index c8b5dfa22..ca7655186 100644 --- a/amlb/runners/aws.py +++ b/amlb/runners/aws.py @@ -13,6 +13,7 @@ - merge downloaded results with existing/local results. - properly cleans up AWS resources (S3, EC2). """ + from __future__ import annotations import datetime @@ -30,7 +31,7 @@ import re import time import threading -from typing import List, Union, cast +from typing import cast from urllib.parse import quote_plus as uenc import boto3 @@ -38,12 +39,31 @@ from ..benchmark import Benchmark, SetupMode from ..datautils import read_csv, write_csv -from ..job import Job, JobError, MultiThreadingJobRunner, SimpleJobRunner, State as JobState +from ..job import ( + Job, + JobError, + MultiThreadingJobRunner, + SimpleJobRunner, + State as JobState, +) from ..resources import config as rconfig, get as rget from ..results import ErrorResult, NoResultError, Scoreboard, TaskResult -from ..utils import Namespace as ns, countdown, datetime_iso, file_filter, flatten, \ - list_all_files, normalize_path, \ - retry_after, retry_policy, str_def, str_iter, tail, touch, Namespace +from ..utils import ( + Namespace as ns, + countdown, + datetime_iso, + file_filter, + flatten, + list_all_files, + normalize_path, + retry_after, + retry_policy, + str_def, + str_iter, + tail, + touch, + Namespace, +) from .docker import DockerBenchmark @@ -57,7 +77,6 @@ class InstanceType(Enum): class AWSError(Exception): - def __init__(self, message=None, retry=False): self.retry = retry super().__init__(message) @@ -76,10 +95,12 @@ def fetch_results(cls, instances_file, instance_selector=None): bench._load_instances(normalize_path(instances_file)) inst = next(inst for inst in bench.instances.values()) bench.sid = inst.session - bucket_name = re.match(r's3://([\w\-.]+)/.*', inst.s3_dir).group(1) - bench.s3 = boto3.resource('s3', region_name=bench.region) + bucket_name = re.match(r"s3://([\w\-.]+)/.*", inst.s3_dir).group(1) + bench.s3 = boto3.resource("s3", region_name=bench.region) bench.bucket = bench._create_s3_bucket(bucket_name, auto_create=False) - instance_selector = (lambda *_: True) if instance_selector is None else instance_selector + instance_selector = ( + (lambda *_: True) if instance_selector is None else instance_selector + ) for iid, _ in filter(instance_selector, bench.instances.items()): bench._download_results(iid) @@ -95,7 +116,9 @@ def reconnect(cls, instances_file): def to_job(iid, inst): inst.instance = bench.ec2.Instance(iid) - job = Job(inst.key, raise_on_failure=rconfig().job_scheduler.exit_on_job_failure) + job = Job( + inst.key, raise_on_failure=rconfig().job_scheduler.exit_on_job_failure + ) job.instance_id = iid def _run(job_self): @@ -103,10 +126,12 @@ def _run(job_self): def _on_done(job_self): terminate = bench._download_results(job_self.ext.instance_id) - if not terminate and rconfig().aws.ec2.terminate_instances == 'success': - log.warning("[WARNING]: EC2 Instance %s won't be terminated as we couldn't download the results: " + if not terminate and rconfig().aws.ec2.terminate_instances == "success": + log.warning( + "[WARNING]: EC2 Instance %s won't be terminated as we couldn't download the results: " "please terminate it manually or restart it (after clearing its UserData) if you want to inspect the instance.", - job_self.ext.instance_id) + job_self.ext.instance_id, + ) bench._stop_instance(job_self.ext.instance_id, terminate=terminate) job._run = _run.__get__(job) @@ -119,7 +144,14 @@ def _on_done(job_self): finally: bench.cleanup() - def __init__(self, framework_name, benchmark_name, constraint_name, region=None, job_history: str = None): + def __init__( + self, + framework_name, + benchmark_name, + constraint_name, + region=None, + job_history: str | None = None, + ): """ :param framework_name: @@ -127,11 +159,19 @@ def __init__(self, framework_name, benchmark_name, constraint_name, region=None, :param constraint_name: :param region: """ - super().__init__(framework_name, benchmark_name, constraint_name, job_history=job_history) - self.suid = datetime_iso(micros=True, no_sep=True) # short sid for AWS entities whose name length is limited - self.region = (region if region - else rconfig().aws.region if rconfig().aws['region'] - else boto3.session.Session().region_name) + super().__init__( + framework_name, benchmark_name, constraint_name, job_history=job_history + ) + self.suid = datetime_iso( + micros=True, no_sep=True + ) # short sid for AWS entities whose name length is limited + self.region = ( + region + if region + else rconfig().aws.region + if rconfig().aws["region"] + else boto3.session.Session().region_name + ) self.ami = rconfig().aws.ec2.regions[self.region].ami self.cloudwatch = None self.ec2 = None @@ -140,19 +180,29 @@ def __init__(self, framework_name, benchmark_name, constraint_name, region=None, self.bucket = None self.uploaded_resources = None self.instance_profile = None - self.instances = {} - self.jobs = [] + self.instances: dict[str, ns] = {} + self.jobs: list[Job] = [] self.exec = None self.monitoring = None self._validate2() def _validate(self): - if rconfig().aws.ec2.terminate_instances not in ['always', 'success', 'never', True, False]: - raise ValueError("`terminate_instances` setting should be one among ['always', 'success', 'never']") + if rconfig().aws.ec2.terminate_instances not in [ + "always", + "success", + "never", + True, + False, + ]: + raise ValueError( + "`terminate_instances` setting should be one among ['always', 'success', 'never']" + ) max_parallel_jobs = rconfig().job_scheduler.max_parallel_jobs if self.parallel_jobs == 0 or self.parallel_jobs > max_parallel_jobs: - log.warning("Forcing parallelization to its upper limit: %s.", max_parallel_jobs) + log.warning( + "Forcing parallelization to its upper limit: %s.", max_parallel_jobs + ) self.parallel_jobs = max_parallel_jobs def _validate2(self): @@ -161,23 +211,27 @@ def _validate2(self): def setup(self, mode): if mode == SetupMode.skip: - log.warning("AWS setup mode set to unsupported {mode}, ignoring.".format(mode=mode)) + log.warning( + "AWS setup mode set to unsupported {mode}, ignoring.".format(mode=mode) + ) # S3 setup to exchange files between local and ec2 instances - self.s3 = boto3.resource('s3', region_name=self.region) + self.s3 = boto3.resource("s3", region_name=self.region) self.bucket = self._create_s3_bucket() - self.uploaded_resources = self._upload_resources() if mode != SetupMode.script else [] + self.uploaded_resources = ( + self._upload_resources() if mode != SetupMode.script else [] + ) # IAM setup to secure exchanges between s3 and ec2 instances - self.iam = boto3.resource('iam', region_name=self.region) + self.iam = boto3.resource("iam", region_name=self.region) if mode == SetupMode.force: log.warning("Cleaning up previously created IAM entities if any.") self._delete_iam_entities() self.instance_profile = self._create_instance_profile() # EC2 setup to prepare creation of ec2 instances - self.ec2 = boto3.resource('ec2', region_name=self.region) - self.cloudwatch = boto3.resource('cloudwatch', region_name=self.region) + self.ec2 = boto3.resource("ec2", region_name=self.region) + self.cloudwatch = boto3.resource("cloudwatch", region_name=self.region) def cleanup(self): self._stop_all_instances() @@ -190,7 +244,9 @@ def cleanup(self): if rconfig().aws.s3.temporary is True: self._delete_s3_bucket() - def run(self, tasks: str | list[str] | None = None, folds: int | list[int] | None = None): + def run( + self, tasks: str | list[str] | None = None, folds: int | list[int] | None = None + ): task_defs = self._get_task_defs(tasks) # validates tasks self._exec_start() self._monitoring_start() @@ -198,7 +254,12 @@ def run(self, tasks: str | list[str] | None = None, folds: int | list[int] | Non if rconfig().aws.minimize_instances: # use one instance per task: all folds executed on same instance try: - jobs = flatten([self._make_aws_job([task_def.name], folds) for task_def in task_defs]) + jobs = flatten( + [ + self._make_aws_job([task_def.name], folds) + for task_def in task_defs + ] + ) results = self._run_jobs(jobs) return self._results_summary(self._process_results(results)) finally: @@ -209,7 +270,9 @@ def run(self, tasks: str | list[str] | None = None, folds: int | list[int] | Non else: # use one instance for all try: - task_names = None if tasks is None else [task_def.name for task_def in task_defs] + task_names = ( + None if tasks is None else [task_def.name for task_def in task_defs] + ) job = self._make_aws_job(task_names, folds) results = self._run_jobs([job]) scoreboard = self._process_results(results) @@ -221,15 +284,23 @@ def _create_job_runner(self, jobs): if self.parallel_jobs == 1: return SimpleJobRunner(jobs) else: - queueing_strategy = MultiThreadingJobRunner.QueueingStrategy.enforce_job_priority - return MultiThreadingJobRunner(jobs, - parallel_jobs=self.parallel_jobs, - delay_secs=rconfig().job_scheduler.delay_between_jobs, - done_async=True, - queueing_strategy=queueing_strategy) + queueing_strategy = ( + MultiThreadingJobRunner.QueueingStrategy.enforce_job_priority + ) + return MultiThreadingJobRunner( + jobs, + parallel_jobs=self.parallel_jobs, + delay_secs=rconfig().job_scheduler.delay_between_jobs, + done_async=True, + queueing_strategy=queueing_strategy, + ) def _make_job(self, task_def, fold=int): - return self._make_aws_job([task_def.name], [fold]) if not self._skip_job(task_def, fold) else None + return ( + self._make_aws_job([task_def.name], [fold]) + if not self._skip_job(task_def, fold) + else None + ) def _exec_start(self): if self.exec is not None: @@ -241,8 +312,8 @@ def _exec_stop(self): return try: self.exec.shutdown(wait=True) - except: - pass + except Exception as e: + log.warning("Error while shutting down thread pool executor: %s", e) finally: self.exec = None @@ -250,31 +321,40 @@ def _exec_send(self, fn, *args, **kwargs): if self.exec is not None: self.exec.submit(fn, *args, **kwargs) else: - log.warning("Application is submitting a function while the thread executor is not running: executing the function in the calling thread.") + log.warning( + "Application is submitting a function while the thread executor is not running: executing the function in the calling thread." + ) try: fn(*args, **kwargs) - except: - pass + except Exception as e: + logging.warning(f"Failed to execute function {fn.__name__}: %s", e) def _job_reschedule(self, job, reason=None, fallback=None): js = rconfig().aws.job_scheduler if not job.ext.retry: start_delay, delay_fn = retry_policy(js.retry_policy) - job.ext.retry = retry_after(start_delay, delay_fn, max_retries=js.max_attempts - 1) + job.ext.retry = retry_after( + start_delay, delay_fn, max_retries=js.max_attempts - 1 + ) wait = next(job.ext.retry, None) if wait is None: if fallback and fallback(job, reason): job.ext.wait_min_secs = 0 else: - log.error("Aborting job %s after %s attempts: %s.", job.name, js.max_attempts, reason) + log.error( + "Aborting job %s after %s attempts: %s.", + job.name, + js.max_attempts, + reason, + ) raise JobError(reason) else: job.ext.wait_min_secs = wait job.reschedule() def _spot_fallback(self, job, reason): - if 'Spot' in reason and rconfig().aws.ec2.spot.fallback_to_on_demand: + if "Spot" in reason and rconfig().aws.ec2.spot.fallback_to_on_demand: job.ext.instance_type = InstanceType.On_Demand return True return False @@ -286,36 +366,65 @@ def _reset_retry(self): def _make_aws_job(self, task_names=None, folds=None): task_names = [] if task_names is None else task_names folds = [] if folds is None else [str(f) for f in folds] - task_def = (self._get_task_def(task_names[0]) if len(task_names) >= 1 - else self._get_task_def('__defaults__', include_disabled=True, fail_on_missing=False) or ns(name='all')) + task_def = ( + self._get_task_def(task_names[0]) + if len(task_names) >= 1 + else self._get_task_def( + "__defaults__", include_disabled=True, fail_on_missing=False + ) + or ns(name="all") + ) task_def = cp.copy(task_def) - tconfig = rconfig()['t'] or ns() # handle task params from cli (-Xt.foo=bar) + tconfig = rconfig()["t"] or ns() # handle task params from cli (-Xt.foo=bar) for k, v in tconfig: setattr(task_def, k, v) instance_def = ns() - instance_def.type = (task_def.ec2_instance_type if 'ec2_instance_type' in task_def - else '.'.join([rconfig().aws.ec2.instance_type.series, rconfig().aws.ec2.instance_type.map.default])) - instance_def.volume_type = (task_def.ec2_volume_type if 'ec2_volume_type' in task_def - else rconfig().aws.ec2.volume_type) - instance_def.volume_size = (math.ceil((task_def.min_vol_size_mb + rconfig().benchmarks.os_vol_size_mb) / 1024.) if task_def.min_vol_size_mb > 0 - else None) - - timeout_secs = (task_def.max_runtime_seconds if 'max_runtime_seconds' in task_def - else sum([task.max_runtime_seconds for task in self.benchmark_def])) + instance_def.type = ( + task_def.ec2_instance_type + if "ec2_instance_type" in task_def + else ".".join( + [ + rconfig().aws.ec2.instance_type.series, + rconfig().aws.ec2.instance_type.map.default, + ] + ) + ) + instance_def.volume_type = ( + task_def.ec2_volume_type + if "ec2_volume_type" in task_def + else rconfig().aws.ec2.volume_type + ) + instance_def.volume_size = ( + math.ceil( + (task_def.min_vol_size_mb + rconfig().benchmarks.os_vol_size_mb) + / 1024.0 + ) + if task_def.min_vol_size_mb > 0 + else None + ) + + timeout_secs = ( + task_def.max_runtime_seconds + if "max_runtime_seconds" in task_def + else sum([task.max_runtime_seconds for task in self.benchmark_def]) + ) timeout_secs += rconfig().benchmarks.overhead_time_seconds timeout_secs += rconfig().aws.overhead_time_seconds seed = rget().seed(int(folds[0])) if len(folds) == 1 else rconfig().seed - job = Job(rconfig().token_separator.join([ - 'aws', - self.benchmark_name, - self.constraint_name, - ','.join(task_names) if len(task_names) > 0 else 'all_tasks', - ','.join(folds) if len(folds) > 0 else 'all_folds', - self.framework_name - ]), + job = Job( + rconfig().token_separator.join( + [ + "aws", + self.benchmark_name, + self.constraint_name, + ",".join(task_names) if len(task_names) > 0 else "all_tasks", + ",".join(folds) if len(folds) > 0 else "all_folds", + self.framework_name, + ] + ), raise_on_failure=rconfig().job_scheduler.exit_on_job_failure, ) job.ext = ns( @@ -327,39 +436,55 @@ def _make_aws_job(self, task_names=None, folds=None): retry=None, instance_type=None, interrupt=None, - terminate=None + terminate=None, ) def _setup(_self): spot_config = rconfig().aws.ec2.spot if _self.ext.instance_type is None and spot_config.enabled: - _self.ext.instance_type = InstanceType.Spot_Block if spot_config.block_enabled else InstanceType.Spot + _self.ext.instance_type = ( + InstanceType.Spot_Block + if spot_config.block_enabled + else InstanceType.Spot + ) if _self.ext.wait_min_secs: _self.ext.interrupt = interrupt = threading.Event() - countdown(_self.ext.wait_min_secs, - message=f"starting job {_self.name}", - interval=rconfig().aws.query_interval_seconds, - interrupt_event=interrupt, - interrupt_cond=lambda: _self.state != JobState.starting) + countdown( + _self.ext.wait_min_secs, + message=f"starting job {_self.name}", + interval=rconfig().aws.query_interval_seconds, + interrupt_event=interrupt, + interrupt_cond=lambda: _self.state != JobState.starting, + ) def _run(_self): try: - resources_root = "/custom" if rconfig().aws.use_docker else "/s3bucket/user" + resources_root = ( + "/custom" if rconfig().aws.use_docker else "/s3bucket/user" + ) _self.ext.instance_id = self._start_instance( instance_def, script_params="{framework} {benchmark} {constraint} {task_param} {folds_param} -Xseed={seed}".format( - framework=self._forward_params['framework_name'], - benchmark=(self._forward_params['benchmark_name']if self.benchmark_path is None or self.benchmark_path.startswith(rconfig().root_dir) - else "{}/{}".format(resources_root, self._rel_path(self.benchmark_path))), - constraint=self._forward_params['constraint_name'], - task_param='' if len(task_names) == 0 else ' '.join(['-t']+task_names), - folds_param='' if len(folds) == 0 else ' '.join(['-f']+folds), + framework=self._forward_params["framework_name"], + benchmark=( + self._forward_params["benchmark_name"] + if self.benchmark_path is None + or self.benchmark_path.startswith(rconfig().root_dir) + else "{}/{}".format( + resources_root, self._rel_path(self.benchmark_path) + ) + ), + constraint=self._forward_params["constraint_name"], + task_param="" + if len(task_names) == 0 + else " ".join(["-t"] + task_names), + folds_param="" if len(folds) == 0 else " ".join(["-f"] + folds), seed=seed, ), # instance_key='_'.join([job.name, datetime_iso(micros=True, time_sep='.')]), instance_key=_self.name, timeout_secs=timeout_secs, - instance_type=_self.ext.instance_type + instance_type=_self.ext.instance_type, ) self._reset_retry() return self._wait_for_results(_self) @@ -367,18 +492,26 @@ def _run(_self): log.error("Job %s failed with: %s", _self.name, e) try: if isinstance(e, AWSError) and e.retry: - log.info("Job %s couldn't start (%s), rescheduling it.", _self.name, e) - self._job_reschedule(_self, reason=str(e), fallback=self._spot_fallback) + log.info( + "Job %s couldn't start (%s), rescheduling it.", + _self.name, + e, + ) + self._job_reschedule( + _self, reason=str(e), fallback=self._spot_fallback + ) return except JobError as je: e = je - self._exec_send((lambda reason, **kwargs: self._save_failures(reason, **kwargs)), - e, - tasks=_self.ext.tasks, - folds=_self.ext.folds, - seed=_self.ext.seed) + self._exec_send( + (lambda reason, **kwargs: self._save_failures(reason, **kwargs)), + e, + tasks=_self.ext.tasks, + folds=_self.ext.folds, + seed=_self.ext.seed, + ) if isinstance(e, JobError): # don't write a result entry for JobErrors @@ -386,46 +519,62 @@ def _run(_self): else: fold = int(folds[0]) if len(folds) > 0 else -1 metadata = ns(lambda: None, framework=self.framework_name) - results = TaskResult(task_def=task_def, fold=fold, constraint=self.constraint_name, metadata=metadata) + results = TaskResult( + task_def=task_def, + fold=fold, + constraint=self.constraint_name, + metadata=metadata, + ) return results.compute_score(result=ErrorResult(e)) def _on_state(_self, state): if state == JobState.completing: terminate, failure = self._download_results(_self.ext.instance_id) - if not terminate and rconfig().aws.ec2.terminate_instances == 'success': - log.warning("[WARNING]: EC2 Instance %s won't be terminated as we couldn't download the results: " - "please terminate it manually or restart it (after clearing its UserData) if you want to inspect the instance.", - _self.ext.instance_id) + if not terminate and rconfig().aws.ec2.terminate_instances == "success": + log.warning( + "[WARNING]: EC2 Instance %s won't be terminated as we couldn't download the results: " + "please terminate it manually or restart it (after clearing its UserData) if you want to inspect the instance.", + _self.ext.instance_id, + ) _self.ext.terminate = terminate instance = self.instances.get(_self.ext.instance_id, {}) - start_time = Namespace.get(instance, 'start_time', '') - stop_time = Namespace.get(instance, 'stop_time', '') - log_time = datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%dT%H:%M:%S") + start_time = Namespace.get(instance, "start_time", "") + stop_time = Namespace.get(instance, "stop_time", "") + log_time = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%S" + ) if failure: - self._exec_send((lambda reason, **kwargs: self._save_failures(reason, **kwargs)), - failure, - tasks=_self.ext.tasks, - folds=_self.ext.folds, - seed=_self.ext.seed, - start_time=start_time, - stop_time=stop_time, - log_time=log_time, - ) + self._exec_send( + ( + lambda reason, **kwargs: self._save_failures( + reason, **kwargs + ) + ), + failure, + tasks=_self.ext.tasks, + folds=_self.ext.folds, + seed=_self.ext.seed, + start_time=start_time, + stop_time=stop_time, + log_time=log_time, + ) elif state == JobState.rescheduling: self._stop_instance(_self.ext.instance_id, terminate=True, wait=False) elif state == JobState.cancelling: - self._stop_instance(_self.ext.instance_id, terminate=_self.ext.terminate, wait=False) + self._stop_instance( + _self.ext.instance_id, terminate=_self.ext.terminate, wait=False + ) if _self.ext.interrupt is not None: _self.ext.interrupt.set() log.warning("Job `%s` was cancelled.", _self.name) return True # job is running remotely: no need to try to cancel what is running here, we just need to stop the instance elif state == JobState.stopping: - self._stop_instance(_self.ext.instance_id, terminate=_self.ext.terminate) + self._stop_instance( + _self.ext.instance_id, terminate=_self.ext.terminate + ) try: self.jobs.remove(_self) except ValueError: @@ -445,13 +594,22 @@ def log_console(): nonlocal last_console_line try: output = instance.console_output(Latest=True) - if 'Output' in output: - output = output['Output'] # note that console_output only returns the last 64kB of console - new_lines, last_line = tail(output, from_line=last_console_line, include_line=False, splitlines=True) + if "Output" in output: + output = output[ + "Output" + ] # note that console_output only returns the last 64kB of console + new_lines, last_line = tail( + output, + from_line=last_console_line, + include_line=False, + splitlines=True, + ) if last_line is not None: - last_console_line = last_line['line'] + last_console_line = last_line["line"] if new_lines: - new_log = '\n'.join([f"[{job.ext.instance_id}]>{line}" for line in new_lines]) + new_log = "\n".join( + [f"[{job.ext.instance_id}]>{line}" for line in new_lines] + ) around = f"[{job.ext.instance_id}:{job.name}]" log.info(f"{around}>>\n{new_log}\n<<{around}") except Exception as e: @@ -459,19 +617,34 @@ def log_console(): job.ext.interrupt = interrupt = threading.Event() while not interrupt.is_set(): - inst_desc = self.instances[job.ext.instance_id] if job.ext.instance_id in self.instances else ns() - if inst_desc['abort']: - self._update_instance(job.ext.instance_id, status='aborted') - raise AWSError("Aborting instance {} for job {}.".format(job.ext.instance_id, job.name)) + inst_desc = ( + self.instances[job.ext.instance_id] + if job.ext.instance_id in self.instances + else ns() + ) + if inst_desc["abort"]: + self._update_instance(job.ext.instance_id, status="aborted") + raise AWSError( + "Aborting instance {} for job {}.".format( + job.ext.instance_id, job.name + ) + ) try: - state = instance.state['Name'] - state_code = instance.state['Code'] - log.info("[%s] checking job %s on instance %s: %s [%s].", datetime_iso(), job.name, job.ext.instance_id, state, state_code) + state = instance.state["Name"] + state_code = instance.state["Code"] + log.info( + "[%s] checking job %s on instance %s: %s [%s].", + datetime_iso(), + job.name, + job.ext.instance_id, + state, + state_code, + ) log_console() self._update_instance(job.ext.instance_id, status=state) if state_code == 16: - if inst_desc['meta_info'] is None: + if inst_desc["meta_info"] is None: volume_info = [ dict(type=v.volume_type, size_gb=v.size, id=v.id) for v in instance.volumes.all() @@ -483,20 +656,36 @@ def log_console(): public_ip=instance.public_ip_address, private_dns_name=instance.private_dns_name, private_ip=instance.private_ip_address, - availability_zone=instance.placement['AvailabilityZone'], + availability_zone=instance.placement["AvailabilityZone"], subnet_id=instance.subnet_id, volumes=volume_info, ) self._update_instance(job.ext.instance_id, meta_info=meta_info) log.info("Running EC2 instance %s: %s", instance.id, meta_info) - elif state_code > 16: # ended instance - state_reason_msg = instance.state_reason['Message'] - log.info("EC2 instance %s is %s: %s", job.ext.instance_id, state, state_reason_msg) + elif state_code > 16: # ended instance + state_reason_msg = instance.state_reason["Message"] + log.info( + "EC2 instance %s is %s: %s", + job.ext.instance_id, + state, + state_reason_msg, + ) # self._update_instance(job.ext.instance_id, stop_reason=state_reason_msg) try: - if any(state in state_reason_msg for state in rconfig().aws.job_scheduler.retry_on_states): - log.warning("Job %s was aborted due to '%s', rescheduling it.", job.name, state_reason_msg) - self._job_reschedule(job, reason=state_reason_msg, fallback=self._spot_fallback) + if any( + state in state_reason_msg + for state in rconfig().aws.job_scheduler.retry_on_states + ): + log.warning( + "Job %s was aborted due to '%s', rescheduling it.", + job.name, + state_reason_msg, + ) + self._job_reschedule( + job, + reason=state_reason_msg, + fallback=self._spot_fallback, + ) finally: interrupt.set() except JobError as je: @@ -510,25 +699,34 @@ def log_console(): def _get_cpu_activity(self, iid, delta_minutes=60, period_minutes=5): now = dt.datetime.utcnow() resp = self.cloudwatch.meta.client.get_metric_statistics( - Namespace='AWS/EC2', - MetricName='CPUUtilization', - Dimensions=[dict(Name='InstanceId', Value=iid)], + Namespace="AWS/EC2", + MetricName="CPUUtilization", + Dimensions=[dict(Name="InstanceId", Value=iid)], StartTime=now - dt.timedelta(minutes=delta_minutes), EndTime=now, - Period=60*period_minutes, - Statistics=['Average'], - Unit='Percent' + Period=60 * period_minutes, + Statistics=["Average"], + Unit="Percent", ) - return [activity['Average'] for activity in sorted(resp['Datapoints'], key=op.itemgetter('Timestamp'), reverse=True)] + return [ + activity["Average"] + for activity in sorted( + resp["Datapoints"], key=op.itemgetter("Timestamp"), reverse=True + ) + ] def _is_hanging(self, iid): cpu_config = rconfig().aws.ec2.monitoring.cpu - activity = self._get_cpu_activity(iid, - delta_minutes=cpu_config.delta_minutes, - period_minutes=cpu_config.period_minutes) + activity = self._get_cpu_activity( + iid, + delta_minutes=cpu_config.delta_minutes, + period_minutes=cpu_config.period_minutes, + ) threshold = cpu_config.threshold min_activity_len = int(cpu_config.delta_minutes / cpu_config.period_minutes) - return len(activity) >= min_activity_len and all([a < threshold for a in activity]) + return len(activity) >= min_activity_len and all( + [a < threshold for a in activity] + ) def _monitoring_start(self): if self.monitoring is not None: @@ -542,12 +740,21 @@ def cpu_monitor(): return while not interrupt.is_set(): try: - active_instances = [iid for iid, info in self.instances.items() if info.instance is not None] + active_instances = [ + iid + for iid, info in self.instances.items() + if info.instance is not None + ] hanging_instances = list(filter(self._is_hanging, active_instances)) for inst in hanging_instances: if inst in self.instances: inst_desc = self.instances[inst] - log.warning("WARN: Instance %s (%s) has no CPU activity in the last %s minutes.", inst, inst_desc.key, cpu_config.delta_minutes) + log.warning( + "WARN: Instance %s (%s) has no CPU activity in the last %s minutes.", + inst, + inst_desc.key, + cpu_config.delta_minutes, + ) if cpu_config.abort_inactive_instances: inst_desc.abort = True except Exception as e: @@ -555,8 +762,12 @@ def cpu_monitor(): finally: interrupt.wait(cpu_config.query_interval_seconds) - self.monitoring = ns(executor=ThreadPoolExecutor(max_workers=1, thread_name_prefix="aws_monitoring_"), - interrupt=interrupt) + self.monitoring = ns( + executor=ThreadPoolExecutor( + max_workers=1, thread_name_prefix="aws_monitoring_" + ), + interrupt=interrupt, + ) self.monitoring.executor.submit(cpu_monitor) def _monitoring_stop(self): @@ -565,17 +776,29 @@ def _monitoring_stop(self): try: self.monitoring.interrupt.set() self.monitoring.executor.shutdown(wait=False) - except: - pass + except Exception as e: + log.warning("Exception while stopping monitoring %s", e) finally: self.monitoring = None - def _start_instance(self, instance_def, script_params="", instance_key=None, timeout_secs=-1, instance_type=None): + def _start_instance( + self, + instance_def, + script_params="", + instance_key=None, + timeout_secs=-1, + instance_type=None, + ): log.info("Starting new EC2 instance with params: %s", script_params) - inst_key = (instance_key.lower() if instance_key - else "{}_p{}_i{}".format(self.sid, - re.sub(r"[\s-]", '', script_params), - datetime_iso(micros=True, time_sep='.')).lower()) + inst_key = ( + instance_key.lower() + if instance_key + else "{}_p{}_i{}".format( + self.sid, + re.sub(r"[\s-]", "", script_params), + datetime_iso(micros=True, time_sep="."), + ).lower() + ) # TODO: don't know if it would be considerably faster to reuse previously stopped instances sometimes # instead of always creating a new one: # would still need to set a new UserData though before restarting the instance. @@ -584,19 +807,24 @@ def _start_instance(self, instance_def, script_params="", instance_key=None, tim if ec2_config.subnet_id: subnet = self.ec2.Subnet(ec2_config.subnet_id) if subnet.available_ip_address_count == 0: - log.warning("No IP available on subnet %s, parallelism (%s) may be too high for this subnet.", subnet.id, self.parallel_jobs) + log.warning( + "No IP available on subnet %s, parallelism (%s) may be too high for this subnet.", + subnet.id, + self.parallel_jobs, + ) raise AWSError("InsufficientFreeAddressesInSubnet", retry=True) ebs = dict(VolumeType=instance_def.volume_type) if instance_def.volume_size: - ebs['VolumeSize'] = instance_def.volume_size + ebs["VolumeSize"] = instance_def.volume_size instance_tags = ec2_config.instance_tags | ns(Name=f"amlb_{inst_key}") - volume_tags = (ec2_config.volume_tags or instance_tags) | ns(Name=f"amlb_{inst_key}") + volume_tags = (ec2_config.volume_tags or instance_tags) | ns( + Name=f"amlb_{inst_key}" + ) instance_params = dict( - BlockDeviceMappings=[dict( - DeviceName=ec2_config.root_device_name, - Ebs=ebs - )], + BlockDeviceMappings=[ + dict(DeviceName=ec2_config.root_device_name, Ebs=ebs) + ], IamInstanceProfile=dict(Name=self.instance_profile.name), ImageId=self.ami, InstanceType=instance_def.type, @@ -605,53 +833,72 @@ def _start_instance(self, instance_def, script_params="", instance_key=None, tim SubnetId=ec2_config.subnet_id, TagSpecifications=[ dict( - ResourceType='instance', - Tags=[dict(Key=k, Value=v) for k, v in instance_tags] + ResourceType="instance", + Tags=[dict(Key=k, Value=v) for k, v in instance_tags], ), dict( - ResourceType='volume', - Tags=[dict(Key=k, Value=v) for k, v in volume_tags] + ResourceType="volume", + Tags=[dict(Key=k, Value=v) for k, v in volume_tags], ), ], - UserData=self._ec2_startup_script(inst_key, script_params=script_params, timeout_secs=timeout_secs) + UserData=self._ec2_startup_script( + inst_key, script_params=script_params, timeout_secs=timeout_secs + ), ) if ec2_config.availability_zone: - instance_params.update(Placement=dict( - AvailabilityZone=ec2_config.availability_zone - )) + instance_params.update( + Placement=dict(AvailabilityZone=ec2_config.availability_zone) + ) if ec2_config.key_name is not None: instance_params.update(KeyName=ec2_config.key_name) if ec2_config.security_groups: instance_params.update(SecurityGroups=ec2_config.security_groups) if instance_type in [InstanceType.Spot, InstanceType.Spot_Block]: spot_options = dict( - SpotInstanceType='one-time', - InstanceInterruptionBehavior='terminate' + SpotInstanceType="one-time", + InstanceInterruptionBehavior="terminate", ) if ec2_config.spot.max_hourly_price: spot_options.update(MaxPrice=str(ec2_config.spot.max_hourly_price)) if instance_type is InstanceType.Spot_Block: - duration_min = math.ceil(timeout_secs/3600) * 60 # duration_min must be a multiple of 60 + duration_min = ( + math.ceil(timeout_secs / 3600) * 60 + ) # duration_min must be a multiple of 60 if duration_min <= 360: # blocks are only allowed until 6h spot_options.update(BlockDurationMinutes=duration_min) - instance_params.update(InstanceMarketOptions=dict( - MarketType='spot', - SpotOptions=spot_options - )) + instance_params.update( + InstanceMarketOptions=dict( + MarketType="spot", SpotOptions=spot_options + ) + ) instance = self.ec2.create_instances(**instance_params)[0] log.info("Started EC2 instance %s", instance.id) - self.instances[instance.id] = ns(instance=instance, key=inst_key, status='started', success='', - start_time=datetime_iso(), stop_time='', stop_reason='', - meta_info=None) + self.instances[instance.id] = ns( + instance=instance, + key=inst_key, + status="started", + success="", + start_time=datetime_iso(), + stop_time="", + stop_reason="", + meta_info=None, + ) except Exception as e: - fake_iid = "no_instance_{}".format(len(self.instances)+1) - self.instances[fake_iid] = ns(instance=None, key=inst_key, status='failed', success=False, - start_time=datetime_iso(), stop_time=datetime_iso(), stop_reason=str(e), - meta_info=None) + fake_iid = "no_instance_{}".format(len(self.instances) + 1) + self.instances[fake_iid] = ns( + instance=None, + key=inst_key, + status="failed", + success=False, + start_time=datetime_iso(), + stop_time=datetime_iso(), + stop_reason=str(e), + meta_info=None, + ) if isinstance(e, botocore.exceptions.ClientError): - error_code = e.response.get('Error', {}).get('Code', '') + error_code = e.response.get("Error", {}).get("Code", "") retry = error_code in rconfig().aws.job_scheduler.retry_on_errors log.error(e) raise AWSError(error_code, retry=retry) from e @@ -670,18 +917,28 @@ def _stop_instance(self, instance_id, terminate=None, wait=True): return terminate_config = rconfig().aws.ec2.terminate_instances - if terminate_config in ['always', True]: + if terminate_config in ["always", True]: terminate = True - elif terminate_config in ['never', False]: + elif terminate_config in ["never", False]: terminate = False else: terminate = False if terminate is None else terminate try: - log.info("%s EC2 instances %s.", "Terminating" if terminate else "Stopping", instance_id) + log.info( + "%s EC2 instances %s.", + "Terminating" if terminate else "Stopping", + instance_id, + ) wait_config = rconfig().aws.ec2.terminate_waiter wait = wait and wait_config is not None and wait_config.max_attempts > 0 - waiter = self.ec2.meta.client.get_waiter('instance_terminated' if terminate else 'instance_stopped') if wait else None + waiter = ( + self.ec2.meta.client.get_waiter( + "instance_terminated" if terminate else "instance_stopped" + ) + if wait + else None + ) if terminate: response = instance.terminate() else: @@ -691,24 +948,38 @@ def _stop_instance(self, instance_id, terminate=None, wait=True): InstanceIds=[instance.id], WaiterConfig=dict( Delay=wait_config.delay or rconfig().aws.query_interval_seconds, - MaxAttempts=wait_config.max_attempts - ) + MaxAttempts=wait_config.max_attempts, + ), ) - log.info("%s EC2 instances %s with response %s.", "Terminated" if terminate else "Stopped", instance_id, response) + log.info( + "%s EC2 instances %s with response %s.", + "Terminated" if terminate else "Stopped", + instance_id, + response, + ) except Exception as e: - log.error("ERROR: EC2 instance %s could not be %s!\n" - "Even if the instance should stop by itself after a certain timeout, " - "you may want to stop/terminate it manually:\n%s", - instance_id, "terminated" if terminate else "stopped", str(e)) + log.error( + "ERROR: EC2 instance %s could not be %s!\n" + "Even if the instance should stop by itself after a certain timeout, " + "you may want to stop/terminate it manually:\n%s", + instance_id, + "terminated" if terminate else "stopped", + str(e), + ) finally: try: - state = response['TerminatingInstances'][0]['CurrentState']['Name'] + state = response["TerminatingInstances"][0]["CurrentState"]["Name"] log.info("Instance %s state: %s.", instance_id, state) - self._update_instance(instance_id, status=state, - stop_time=datetime_iso(), - stop_reason=instance.state_reason['Message']) - except: - pass + self._update_instance( + instance_id, + status=state, + stop_time=datetime_iso(), + stop_reason=instance.state_reason["Message"], + ) + except Exception as e: + log.warning( + f"Ignoring exception raised while updating instance {instance_id}: {str(e)}" + ) def _update_instance(self, instance_id, **kwargs): do_save = False @@ -725,7 +996,10 @@ def _stop_all_instances(self): self._stop_instance(iid, wait=False) def _save_instances(self): - write_csv([(iid, + write_csv( + [ + ( + iid, self.instances[iid].status, self.instances[iid].success, self.instances[iid].start_time, @@ -734,45 +1008,81 @@ def _save_instances(self): self.sid, self.instances[iid].key, self._s3_key(self.sid, instance_key_or_id=iid, absolute=True), - self.instances[iid].meta_info - ) for iid in self.instances.keys()], - columns=['ec2', 'status', 'success', 'start_time', 'stop_time', 'stop_reason', 'session', 'instance_key', 's3_dir', 'meta_info'], - path=os.path.join(self.output_dirs.session, 'instances.csv')) + self.instances[iid].meta_info, + ) + for iid in self.instances.keys() + ], + columns=[ + "ec2", + "status", + "success", + "start_time", + "stop_time", + "stop_reason", + "session", + "instance_key", + "s3_dir", + "meta_info", + ], + path=os.path.join(self.output_dirs.session, "instances.csv"), + ) def _load_instances(self, instances_file): df = read_csv(instances_file) - self.instances = {row['ec2']: ns( - status=row['status'], - success=row['success'], - session=row['session'], - key=row['instance_key'], - s3_dir=row['s3_dir'], - ) for idx, row in df.iterrows()} + self.instances = { + row["ec2"]: ns( + status=row["status"], + success=row["success"], + session=row["session"], + key=row["instance_key"], + s3_dir=row["s3_dir"], + ) + for idx, row in df.iterrows() + } def _save_failures(self, reason, **kwargs): try: - file = os.path.join(self.output_dirs.session, 'failures.csv') - write_csv([(self._forward_params['framework_name'], - self._forward_params['benchmark_name'], - self._forward_params['constraint_name'], - str_iter(kwargs.get('tasks', [])), - str_iter(kwargs.get('folds', [])), - str_def(kwargs.get('seed', None)), - kwargs.get('start_time', "unknown"), - kwargs.get('stop_time', "unknown"), - kwargs.get('log_time', "unknown"), - str_def(reason, if_none="unknown"))], - columns=['framework', 'benchmark', 'constraint', 'tasks', 'folds', 'seed', 'start_time', 'stop_time', 'log_time', 'error'], - header=not os.path.exists(file), - path=file, - append=True) + file = os.path.join(self.output_dirs.session, "failures.csv") + write_csv( + [ + ( + self._forward_params["framework_name"], + self._forward_params["benchmark_name"], + self._forward_params["constraint_name"], + str_iter(kwargs.get("tasks", [])), + str_iter(kwargs.get("folds", [])), + str_def(kwargs.get("seed", None)), + kwargs.get("start_time", "unknown"), + kwargs.get("stop_time", "unknown"), + kwargs.get("log_time", "unknown"), + str_def(reason, if_none="unknown"), + ) + ], + columns=[ + "framework", + "benchmark", + "constraint", + "tasks", + "folds", + "seed", + "start_time", + "stop_time", + "log_time", + "error", + ], + header=not os.path.exists(file), + path=file, + append=True, + ) except Exception as e: log.exception(e) - def _s3_key(self, main_dir, *subdirs, instance_key_or_id=None, absolute=False, encode=False): + def _s3_key( + self, main_dir, *subdirs, instance_key_or_id=None, absolute=False, encode=False + ): root_key = str_def(rconfig().aws.s3.root_key) if instance_key_or_id is None: - ikey = '' + ikey = "" elif instance_key_or_id in self.instances.keys(): ikey = self.instances[instance_key_or_id].key else: @@ -781,48 +1091,62 @@ def _s3_key(self, main_dir, *subdirs, instance_key_or_id=None, absolute=False, e if encode: tokens = map(uenc, tokens) rel_key = url_join(root_key, *tokens) - return url_join('s3://', self.bucket.name, rel_key) if absolute else rel_key + return url_join("s3://", self.bucket.name, rel_key) if absolute else rel_key def _s3_session(self, *subdirs, **kwargs): return self._s3_key(self.sid, *subdirs, **kwargs) def _s3_user(self, *subdirs, **kwargs): - return self._s3_key(self.sid, 'user', *subdirs, **kwargs) + return self._s3_key(self.sid, "user", *subdirs, **kwargs) def _s3_input(self, *subdirs, **kwargs): - return self._s3_key(self.sid, 'input', *subdirs, **kwargs) + return self._s3_key(self.sid, "input", *subdirs, **kwargs) def _s3_output(self, instance_key_or_id, *subdirs, **kwargs): - return self._s3_key(self.sid, 'output', *subdirs, instance_key_or_id=instance_key_or_id, **kwargs) + return self._s3_key( + self.sid, + "output", + *subdirs, + instance_key_or_id=instance_key_or_id, + **kwargs, + ) def _create_s3_bucket(self, bucket_name=None, auto_create=True): # cf. s3 restrictions: https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html if bucket_name is None: bucket_name = rconfig().aws.s3.bucket if rconfig().aws.s3.temporary: - bucket_name += ('-' + self.suid) + bucket_name += "-" + self.suid try: self.s3.meta.client.head_bucket(Bucket=bucket_name) bucket = self.s3.Bucket(bucket_name) except botocore.exceptions.ClientError as e: - error_code = int(e.response['Error']['Code']) + error_code = int(e.response["Error"]["Code"]) if error_code == 404 and auto_create: - log.info("%s bucket doesn't exist, creating it in region %s.", bucket_name, self.region) + log.info( + "%s bucket doesn't exist, creating it in region %s.", + bucket_name, + self.region, + ) bucket = self.s3.create_bucket( Bucket=bucket_name, - CreateBucketConfiguration=dict( - LocationConstraint=self.region - ) + CreateBucketConfiguration=dict(LocationConstraint=self.region), ) log.info("S3 bucket %s was successfully created.", bucket_name) else: if error_code == 403: - log.error("You don't have access rights to S3 bucket %s.\n" - "Please ensure that you specified a unique `aws.s3.bucket` in your config file" - " or verify that your AWS account is correctly configured" - " (cf. docs/README.md for more details).", bucket_name) + log.error( + "You don't have access rights to S3 bucket %s.\n" + "Please ensure that you specified a unique `aws.s3.bucket` in your config file" + " or verify that your AWS account is correctly configured" + " (cf. docs/README.md for more details).", + bucket_name, + ) elif error_code == 404: - log.error("S3 bucket %s does not exist and auto-creation is disabled.", bucket_name) + log.error( + "S3 bucket %s does not exist and auto-creation is disabled.", + bucket_name, + ) raise e return bucket @@ -832,11 +1156,12 @@ def _delete_s3_bucket(self): # but this is intended only for temporary buckets, so no need for pagination to_delete = [dict(Key=o.key) for o in self.bucket.objects.all()] if len(to_delete) > 0: - log.info("Deleting objects from S3 bucket %s: %s", self.bucket.name, to_delete) - self.bucket.delete_objects(Delete=dict( - Objects=to_delete, - Quiet=True - )) + log.info( + "Deleting objects from S3 bucket %s: %s", + self.bucket.name, + to_delete, + ) + self.bucket.delete_objects(Delete=dict(Objects=to_delete, Quiet=True)) log.info("Deleting s3 bucket %s.", self.bucket.name) self.bucket.delete() log.info("S3 bucket %s was successfully deleted.", self.bucket.name) @@ -847,9 +1172,13 @@ def _rel_path(self, res_path): return None in_input_dir = res_path.startswith(rconfig().input_dir) in_user_dir = res_path.startswith(rconfig().user_dir) - return (os.path.relpath(res_path, start=rconfig().input_dir) if in_input_dir - else os.path.relpath(res_path, start=rconfig().user_dir) if in_user_dir - else os.path.basename(res_path)) + return ( + os.path.relpath(res_path, start=rconfig().input_dir) + if in_input_dir + else os.path.relpath(res_path, start=rconfig().user_dir) + if in_user_dir + else os.path.basename(res_path) + ) def _dest_path(self, res_path): name = self._rel_path(res_path) @@ -861,7 +1190,9 @@ def _dest_path(self, res_path): def _upload_resources(self): default_paths = [self.benchmark_path] if self.benchmark_path is not None else [] upload_paths = default_paths + rconfig().aws.resource_files - upload_files = list_all_files(upload_paths, file_filter(exclude=rconfig().aws.resource_ignore)) + upload_files = list_all_files( + upload_paths, file_filter(exclude=rconfig().aws.resource_ignore) + ) log.debug("Uploading files to S3: %s", upload_files) uploaded_resources = [] for res in upload_files: @@ -869,7 +1200,12 @@ def _upload_resources(self): if upload_path is None: log.debug("Skipping upload of `%s` to s3 bucket.", res) continue - log.info("Uploading `%s` to `%s` on s3 bucket %s.", res, upload_path, self.bucket.name) + log.info( + "Uploading `%s` to `%s` on s3 bucket %s.", + res, + upload_path, + self.bucket.name, + ) self.bucket.upload_file(res, upload_path) uploaded_resources.append(upload_path) return uploaded_resources @@ -877,11 +1213,13 @@ def _upload_resources(self): def _delete_resources(self): if self.uploaded_resources is None: return - log.info("Deleting uploaded resources `%s` from s3 bucket %s.", self.uploaded_resources, self.bucket.name) + log.info( + "Deleting uploaded resources `%s` from s3 bucket %s.", + self.uploaded_resources, + self.bucket.name, + ) self.bucket.delete_objects( - Delete=dict( - Objects=[dict(Key=res) for res in self.uploaded_resources] - ) + Delete=dict(Objects=[dict(Key=res) for res in self.uploaded_resources]) ) def _download_results(self, instance_id): @@ -895,14 +1233,24 @@ def _download_results(self, instance_id): def download_file(obj, dest, dest_display_path=None): dest_display_path = dest if dest_display_path is None else dest_display_path try: - log.info("Downloading `%s` from s3 bucket %s to `%s`.", obj.key, self.bucket.name, dest_display_path) + log.info( + "Downloading `%s` from s3 bucket %s to `%s`.", + obj.key, + self.bucket.name, + dest_display_path, + ) if isinstance(dest, str): touch(dest) obj.download_file(dest) else: obj.download_fileobj(dest) except Exception as e: - log.exception("Failed downloading `%s` from s3 bucket %s: %s", obj.key, self.bucket.name, str(e)) + log.exception( + "Failed downloading `%s` from s3 bucket %s: %s", + obj.key, + self.bucket.name, + str(e), + ) raise e success = self.instances[instance_id].success is True @@ -910,7 +1258,10 @@ def download_file(obj, dest, dest_display_path=None): objs = [] try: instance_output_key = self._s3_output(instance_id, encode=True) - objs = [o.Object() for o in self.bucket.objects.filter(Prefix=instance_output_key)] + objs = [ + o.Object() + for o in self.bucket.objects.filter(Prefix=instance_output_key) + ] session_key = self._s3_session(encode=True) # result_key = self._s3_output(instance_id, Scoreboard.results_file, encode=True) for obj in objs: @@ -920,31 +1271,40 @@ def download_file(obj, dest, dest_display_path=None): try: download_file(obj, dest_path) if is_result and not success: - self._exec_send(lambda path: self._save_global(Scoreboard.from_file(path)), dest_path) + self._exec_send( + lambda path: self._save_global(Scoreboard.from_file(path)), + dest_path, + ) success = True except Exception as e: if is_result: error = e except Exception as e: - log.exception("Failed downloading benchmark results from s3 bucket %s: %s", self.bucket.name, str(e)) + log.exception( + "Failed downloading benchmark results from s3 bucket %s: %s", + self.bucket.name, + str(e), + ) error = e if not success and error is None: if len(objs) > 0: - error = NoResultError(f"No {Scoreboard.results_file} file found among the result artifacts: " - f"check the remote logs if available or the local logs to understand what happened on the instance.") + error = NoResultError( + f"No {Scoreboard.results_file} file found among the result artifacts: " + f"check the remote logs if available or the local logs to understand what happened on the instance." + ) else: - error = NoResultError(f"No result artifacts, either the benchmark failed to start, or the instance got killed: " - f"check the local logs to understand what happened on the instance.") + error = NoResultError( + "No result artifacts, either the benchmark failed to start, or the instance got killed: " + "check the local logs to understand what happened on the instance." + ) log.info("Instance `%s` success=%s", instance_id, success) self._update_instance(instance_id, success=success) return success, error def _results_summary(self, scoreboard=None): - log.info( - "Result summary not available for AWS mode (reference files instead)." - ) + log.info("Result summary not available for AWS mode (reference files instead).") def _create_instance_profile(self): """ @@ -955,12 +1315,16 @@ def _create_instance_profile(self): """ s3c = rconfig().aws.s3 iamc = rconfig().aws.iam - bucket_prefix = (s3c.bucket+'-') if (s3c.temporary and not iamc.temporary) else self.bucket.name + bucket_prefix = ( + (s3c.bucket + "-") + if (s3c.temporary and not iamc.temporary) + else self.bucket.name + ) role_name = iamc.role_name profile_name = iamc.instance_profile_name if iamc.temporary: - role_name += ('-' + self.suid) - profile_name += ('-' + self.suid) + role_name += "-" + self.suid + profile_name += "-" + self.suid irole = None try: @@ -970,49 +1334,56 @@ def _create_instance_profile(self): log.info("Role %s doesn't exist, creating it: [%s].", role_name, str(e)) if not irole: - ec2_role_trust_policy_json = json.dumps({ # trust role - 'Version': '2012-10-17', # version of the policy language, cf. https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html - 'Statement': [ - { - 'Effect': 'Allow', - 'Principal': {'Service': 'ec2.amazonaws.com'}, - 'Action': 'sts:AssumeRole' - } - ] - }) + ec2_role_trust_policy_json = json.dumps( + { # trust role + "Version": "2012-10-17", # version of the policy language, cf. https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "ec2.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ) irole = self.iam.create_role( RoleName=role_name, AssumeRolePolicyDocument=ec2_role_trust_policy_json, - MaxSessionDuration=iamc.max_role_session_duration_secs + MaxSessionDuration=iamc.max_role_session_duration_secs, ) log.info("Role %s successfully created.", role_name) if iamc.s3_policy_name not in [p.name for p in irole.policies.all()]: - resource_prefix="arn:aws:s3:::{bucket}*/{root_key}".format(bucket=bucket_prefix, root_key=str_def(s3c.root_key)) # ARN format for s3, cf. https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html - s3_policy_json = json.dumps({ - 'Version': '2012-10-17', - 'Statement': [ - { - 'Effect': 'Allow', - 'Action': 's3:List*', - 'Resource': 'arn:aws:s3:::{}*'.format(bucket_prefix) - }, - { - 'Effect': 'Allow', - 'Action': 's3:GetObject', # S3 actions, cf. https://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html - 'Resource': resource_prefix+'*' - }, - { - 'Effect': 'Allow', - 'Action': 's3:PutObject', - 'Resource': resource_prefix+'*' # technically, we could grant write access for each instance only to its own 'directory', but this is not necessary - } - ] - }) + resource_prefix = "arn:aws:s3:::{bucket}*/{root_key}".format( + bucket=bucket_prefix, root_key=str_def(s3c.root_key) + ) # ARN format for s3, cf. https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html + s3_policy_json = json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "s3:List*", + "Resource": "arn:aws:s3:::{}*".format(bucket_prefix), + }, + { + "Effect": "Allow", + "Action": "s3:GetObject", # S3 actions, cf. https://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html + "Resource": resource_prefix + "*", + }, + { + "Effect": "Allow", + "Action": "s3:PutObject", + "Resource": resource_prefix + + "*", # technically, we could grant write access for each instance only to its own 'directory', but this is not necessary + }, + ], + } + ) self.iam.meta.client.put_role_policy( RoleName=irole.name, PolicyName=iamc.s3_policy_name, - PolicyDocument=s3_policy_json + PolicyDocument=s3_policy_json, ) iprofile = None @@ -1020,14 +1391,23 @@ def _create_instance_profile(self): self.iam.meta.client.get_instance_profile(InstanceProfileName=profile_name) iprofile = self.iam.InstanceProfile(profile_name) except botocore.exceptions.ClientError as e: - log.info("Instance profile %s doesn't exist, creating it: [%s].", profile_name, str(e)) + log.info( + "Instance profile %s doesn't exist, creating it: [%s].", + profile_name, + str(e), + ) if not iprofile: - iprofile = self.iam.create_instance_profile(InstanceProfileName=profile_name) + iprofile = self.iam.create_instance_profile( + InstanceProfileName=profile_name + ) log.info("Instance profile %s successfully created.", profile_name) waiting_time = iamc.credentials_propagation_waiting_time_secs steps = math.ceil(waiting_time / 10) for i in range(steps): - log.info("Waiting for new credentials propagation, time left = %ss.", round(waiting_time * (1 - i/steps))) + log.info( + "Waiting for new credentials propagation, time left = %ss.", + round(waiting_time * (1 - i / steps)), + ) time.sleep(waiting_time / steps) if irole.name not in [r.name for r in iprofile.roles]: @@ -1042,16 +1422,26 @@ def _delete_iam_entities(self): if iprofile is None: profile_name = iamc.instance_profile_name if iamc.temporary: - profile_name += ('-' + self.suid) + profile_name += "-" + self.suid try: - self.iam.meta.client.get_instance_profile(InstanceProfileName=profile_name) + self.iam.meta.client.get_instance_profile( + InstanceProfileName=profile_name + ) iprofile = self.iam.InstanceProfile(profile_name) except botocore.exceptions.ClientError as e: - log.info("Instance profile %s doesn't exist, nothing to delete: [%s]", profile_name, str(e)) + log.info( + "Instance profile %s doesn't exist, nothing to delete: [%s]", + profile_name, + str(e), + ) if iprofile is not None: for role in iprofile.roles: - log.info("Removing role %s from instance profile %s.", role.name, iprofile.name) + log.info( + "Removing role %s from instance profile %s.", + role.name, + iprofile.name, + ) iprofile.remove_role(RoleName=role.name) self._delete_iam_entities_from_role(role.name) log.info("Deleting instance profile %s.", iprofile.name) @@ -1060,7 +1450,7 @@ def _delete_iam_entities(self): else: role_name = iamc.role_name if iamc.temporary: - role_name += ('-' + self.suid) + role_name += "-" + self.suid self._delete_iam_entities_from_role(role_name) def _delete_iam_entities_from_role(self, role_name): @@ -1069,11 +1459,19 @@ def _delete_iam_entities_from_role(self, role_name): self.iam.meta.client.get_role(RoleName=role_name) irole = self.iam.Role(role_name) for policy in irole.policies.all(): - log.info("Deleting role policy %s from role %s.", policy.name, policy.role_name) + log.info( + "Deleting role policy %s from role %s.", + policy.name, + policy.role_name, + ) policy.delete() log.info("Policy %s was successfully deleted.", policy.name) for profile in irole.instance_profiles.all(): - log.info("Removing instance profile %s from role %s.", profile.name, irole.name) + log.info( + "Removing instance profile %s from role %s.", + profile.name, + irole.name, + ) profile.remove_role(RoleName=irole.name) log.info("Deleting instance profile %s.", profile.name) profile.delete() @@ -1082,7 +1480,11 @@ def _delete_iam_entities_from_role(self, role_name): irole.delete() log.info("Role %s was successfully deleted.", irole.name) except botocore.exceptions.ClientError as e: - log.info("Role %s doesn't exist, skipping its deletion: [%s]", iamc.role_name, str(e)) + log.info( + "Role %s doesn't exist, skipping its deletion: [%s]", + iamc.role_name, + str(e), + ) def _ec2_startup_script(self, instance_key, script_params="", timeout_secs=-1): """ @@ -1105,7 +1507,8 @@ def _ec2_startup_script(self, instance_key, script_params="", timeout_secs=-1): :return: the UserData for the new ec2 instance """ script_extra_params = "--session=" - cloud_config = """ + cloud_config = ( + """ #cloud-config package_update: true @@ -1139,7 +1542,9 @@ def _ec2_startup_script(self, instance_key, script_params="", timeout_secs=-1): message: "I'm losing power" timeout: {timeout} condition: True -""" if rconfig().aws.use_docker else """ +""" + if rconfig().aws.use_docker + else """ #cloud-config package_update: true @@ -1195,10 +1600,12 @@ def _ec2_startup_script(self, instance_key, script_params="", timeout_secs=-1): timeout: {timeout} condition: True """ + ) return cloud_config.format( repo=rget().project_info.repo, branch=rget().project_info.branch, - image=rconfig().docker.image or DockerBenchmark.image_name(self.framework_def), + image=rconfig().docker.image + or DockerBenchmark.image_name(self.framework_def), pyv=rconfig().versions.python, pipv=rconfig().versions.pip, s3_base_url=self._s3_session(absolute=True, encode=True), @@ -1210,21 +1617,31 @@ def _ec2_startup_script(self, instance_key, script_params="", timeout_secs=-1): params=script_params, extra_params=script_extra_params, docker_options=rconfig().docker.run_extra_options, - timeout=timeout_secs if timeout_secs > 0 else rconfig().aws.max_timeout_seconds, + timeout=timeout_secs + if timeout_secs > 0 + else rconfig().aws.max_timeout_seconds, ) class AWSRemoteBenchmark(Benchmark): - # TODO: idea is to handle results progressively on the remote side and push results as soon as they're generated # this would allow to safely run multiple tasks on single AWS instance - def __init__(self, framework_name, benchmark_name, constraint_name, region=None, job_history: str = None): + def __init__( + self, + framework_name, + benchmark_name, + constraint_name, + region=None, + job_history: str | None = None, + ): self.region = region - self.s3 = boto3.resource('s3', region_name=self.region) + self.s3 = boto3.resource("s3", region_name=self.region) self.bucket = self._init_bucket() self._download_resources() - super().__init__(framework_name, benchmark_name, constraint_name, job_history=job_history) + super().__init__( + framework_name, benchmark_name, constraint_name, job_history=job_history + ) def run(self, save_scores=False): super().run(save_scores) @@ -1233,6 +1650,7 @@ def run(self, save_scores=False): def _make_job(self, task_name=None, folds=None): job = super()._make_job(task_name, folds) super_run = job._run + def new_run(): super_run() # self._upload_result() @@ -1246,7 +1664,9 @@ def _init_bucket(self): def _download_resources(self): root_key = str_def(rconfig().aws.s3.root_key) benchmark_basename = os.path.basename(self.benchmark_path) - self.bucket.upload_file(self.benchmark_path, root_key+('/'.join(['input', benchmark_basename]))) + self.bucket.upload_file( + self.benchmark_path, root_key + ("/".join(["input", benchmark_basename])) + ) def _upload_results(self): pass diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/conftest.py b/tests/conftest.py index 184da47f5..2e2935717 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,16 @@ +from pathlib import Path +from typing import Generator + import pytest from amlb import Resources from amlb.utils import Namespace +@pytest.fixture(autouse=True) +def tmp_output_directory(tmp_path: Path) -> Generator[Path, None, None]: + yield tmp_path + + @pytest.fixture def simple_resource(): return Resources( @@ -22,7 +30,7 @@ def simple_resource(): root_module="frameworks", definition_file=[], allow_duplicates=False, - tags=[] - ) + tags=[], + ), ) ) diff --git a/tests/unit/amlb/benchmarks/test_benchmark.py b/tests/unit/amlb/benchmarks/test_benchmark.py new file mode 100644 index 000000000..50d39246f --- /dev/null +++ b/tests/unit/amlb/benchmarks/test_benchmark.py @@ -0,0 +1,44 @@ +from amlb import Benchmark, SetupMode, resources +import os + +from amlb.defaults import default_dirs +from amlb.utils import config_load +from amlb.utils import Namespace as ns +from tests.conftest import tmp_output_directory + + +def test_benchmark(tmp_path) -> None: + config_default = config_load( + os.path.join(default_dirs.root_dir, "resources", "config.yaml") + ) + config_default_dirs = default_dirs + # allowing config override from user_dir: useful to define custom benchmarks and frameworks for example. + config_user = ns() + # config listing properties set by command line + config_args = ns.parse( + {"results.global_save": False}, + output_dir=str(tmp_output_directory), + script=os.path.basename(__file__), + run_mode="local", + parallel_jobs=1, + sid="pytest.session", + exit_on_error=True, + test_server=False, + tag=None, + command="pytest invocation", + ) + config_args = ns({k: v for k, v in config_args if v is not None}) + # merging all configuration files and saving to the global variable + resources.from_configs( + config_default, config_default_dirs, config_user, config_args + ) + benchmark = Benchmark( + framework_name="constantpredictor", + benchmark_name="test", + constraint_name="test", + job_history=None, + ) + benchmark.setup(SetupMode.force) + results = benchmark.run() + assert len(results) == 6 + assert not results["result"].isna().any()