From f43bd4317bf7945801f5f36340bf82c718a5669f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Mon, 11 Dec 2023 18:04:14 +0000 Subject: [PATCH] feat: track training data and feature importance --- src/otg/method/l2g/model.py | 88 ++++++++++++++++++++++++++--------- src/otg/method/l2g/trainer.py | 1 + 2 files changed, 68 insertions(+), 21 deletions(-) diff --git a/src/otg/method/l2g/model.py b/src/otg/method/l2g/model.py index ad87cf1d2..bf3ffe67f 100644 --- a/src/otg/method/l2g/model.py +++ b/src/otg/method/l2g/model.py @@ -14,6 +14,7 @@ from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml.tuning import ParamGridBuilder from wandb.wandb_run import Run +from xgboost.spark.core import SparkXGBClassifierModel from otg.dataset.l2g_feature_matrix import L2GFeatureMatrix from otg.method.l2g.evaluator import WandbEvaluator @@ -31,6 +32,7 @@ class LocusToGeneModel: estimator: Any = None pipeline: Pipeline = Pipeline(stages=[]) model: PipelineModel | None = None + wandb_l2g_project_name: str = "otg_l2g" def __post_init__(self: LocusToGeneModel) -> None: """Post init that adds the model to the ML pipeline.""" @@ -98,29 +100,39 @@ def features_vector_assembler(features_cols: list[str]) -> VectorAssembler: .setOutputCol("features") ) - @staticmethod def log_to_wandb( + self: LocusToGeneModel, results: DataFrame, - binary_evaluator: BinaryClassificationEvaluator, - multi_evaluator: MulticlassClassificationEvaluator, + training_data: L2GFeatureMatrix, + evaluators: list[ + BinaryClassificationEvaluator | MulticlassClassificationEvaluator + ], wandb_run: Run, ) -> None: - """Perform evaluation of the model by applying it to a test set and tracking the results with W&B. + """Log evaluation results and feature importance to W&B. Args: results (DataFrame): Dataframe containing the predictions - binary_evaluator (BinaryClassificationEvaluator): Binary evaluator - multi_evaluator (MulticlassClassificationEvaluator): Multiclass evaluator + training_data (L2GFeatureMatrix): Training data used for the model. If provided, the table and the number of positive and negative labels will be logged to W&B + evaluators (list[BinaryClassificationEvaluator | MulticlassClassificationEvaluator]): List of Spark ML evaluators to use for evaluation wandb_run (Run): W&B run to log the results to """ - binary_wandb_evaluator = WandbEvaluator( - spark_ml_evaluator=binary_evaluator, wandb_run=wandb_run - ) - binary_wandb_evaluator.evaluate(results) - multi_wandb_evaluator = WandbEvaluator( - spark_ml_evaluator=multi_evaluator, wandb_run=wandb_run - ) - multi_wandb_evaluator.evaluate(results) + ## Track evaluation metrics + for evaluator in evaluators: + wandb_evaluator = WandbEvaluator( + spark_ml_evaluator=evaluator, wandb_run=wandb_run + ) + wandb_evaluator.evaluate(results) + ## Track feature importance + wandb_run.log({"importances": self.get_feature_importance()}) + ## Track training set metadata + gs_counts_dict = { + "goldStandard" + row["goldStandardSet"].capitalize(): row["count"] + for row in training_data.df.groupBy("goldStandardSet").count().collect() + } + wandb_run.log(gs_counts_dict) + training_table = wandb.Table(dataframe=training_data.df.toPandas()) + wandb_run.log({"trainingSet": wandb.Table(dataframe=training_table)}) @classmethod def load_from_disk( @@ -189,6 +201,7 @@ def evaluate( results: DataFrame, hyperparameters: dict[str, Any], wandb_run_name: str | None, + training_data: L2GFeatureMatrix | None = None, ) -> None: """Perform evaluation of the model by applying it to a test set and tracking the results with W&B. @@ -196,6 +209,7 @@ def evaluate( results (DataFrame): Dataframe containing the predictions hyperparameters (dict[str, Any]): Hyperparameters used for the model wandb_run_name (str | None): Descriptive name for the run to be tracked with W&B + training_data (L2GFeatureMatrix | None): Training data used for the model. If provided, the ratio of positive to negative labels will be logged to W&B """ binary_evaluator = BinaryClassificationEvaluator( rawPredictionCol="rawPrediction", labelCol="label" @@ -226,20 +240,52 @@ def evaluate( multi_evaluator.evaluate(results, {multi_evaluator.metricName: "f1"}), ) - if wandb_run_name: + if wandb_run_name and training_data: print("Logging to W&B...") run = wandb.init( - project="otg_l2g", config=hyperparameters, name=wandb_run_name + project=self.wandb_l2g_project_name, + config=hyperparameters, + name=wandb_run_name, ) if isinstance(run, Run): - LocusToGeneModel.log_to_wandb( - results, binary_evaluator, multi_evaluator, run + self.log_to_wandb( + results, training_data, [binary_evaluator, multi_evaluator], run ) run.finish() - def plot_importance(self: LocusToGeneModel) -> None: - """Plot the feature importance of the model.""" - # xgb_plot_importance(self) # FIXME: What is the attribute that stores the model? + @property + def feature_name_map(self: LocusToGeneModel) -> dict[str, str]: + """Return a dictionary mapping encoded feature names to the original names. + + Returns: + dict[str, str]: Feature name map of the model + + Raises: + ValueError: If the model has not been fitted yet + """ + if not self.model: + raise ValueError("Model not fitted yet. `fit()` has to be called first.") + elif isinstance(self.model.stages[1], VectorAssembler): + feature_names = self.model.stages[1].getInputCols() + return {f"f{i}": feature_name for i, feature_name in enumerate(feature_names)} + + def get_feature_importance(self: LocusToGeneModel) -> dict[str, float]: + """Return dictionary with relative importances of every feature in the model. Feature names are encoded and have to be mapped back to their original names. + + Returns: + dict[str, float]: Dictionary mapping feature names to their importance + + Raises: + ValueError: If the model has not been fitted yet or is not an XGBoost model + """ + if not self.model or not isinstance( + self.model.stages[-1], SparkXGBClassifierModel + ): + raise ValueError( + f"Model type {type(self.model)} not supported for feature importance." + ) + importance_map = self.model.stages[-1].get_feature_importances() + return {self.feature_name_map[k]: v for k, v in importance_map.items()} def fit( self: LocusToGeneModel, diff --git a/src/otg/method/l2g/trainer.py b/src/otg/method/l2g/trainer.py index 8c1ba7a36..95ac506f0 100644 --- a/src/otg/method/l2g/trainer.py +++ b/src/otg/method/l2g/trainer.py @@ -53,6 +53,7 @@ def train( results=model.predict(test), hyperparameters=hyperparams, wandb_run_name=wandb_run_name, + training_data=train, ) if model_path: l2g_model.save(model_path)