diff --git a/molpipeline/explainability/__init__.py b/molpipeline/explainability/__init__.py new file mode 100644 index 00000000..0d934450 --- /dev/null +++ b/molpipeline/explainability/__init__.py @@ -0,0 +1,19 @@ +"""Explainability module for the molpipeline package.""" + +from molpipeline.explainability.explainer import SHAPTreeExplainer +from molpipeline.explainability.explanation import ( + SHAPFeatureAndAtomExplanation, + SHAPFeatureExplanation, +) +from molpipeline.explainability.visualization.visualization import ( + structure_heatmap, + structure_heatmap_shap, +) + +__all__ = [ + "SHAPFeatureExplanation", + "SHAPFeatureAndAtomExplanation", + "SHAPTreeExplainer", + "structure_heatmap", + "structure_heatmap_shap", +] diff --git a/molpipeline/explainability/explainer.py b/molpipeline/explainability/explainer.py new file mode 100644 index 00000000..0323c42b --- /dev/null +++ b/molpipeline/explainability/explainer.py @@ -0,0 +1,467 @@ +"""Explainer classes for explaining predictions.""" + +from __future__ import annotations + +import abc +from typing import Any, TypeAlias + +import numpy as np +import numpy.typing as npt +import pandas as pd +import shap +from scipy.sparse import issparse, spmatrix +from sklearn.base import BaseEstimator +from typing_extensions import override + +from molpipeline import Pipeline +from molpipeline.abstract_pipeline_elements.core import OptionalMol +from molpipeline.explainability.explanation import ( + AtomExplanationMixin, + BondExplanationMixin, + FeatureExplanationMixin, + FeatureInfoMixin, + SHAPExplanationMixin, + SHAPFeatureAndAtomExplanation, + SHAPFeatureExplanation, +) +from molpipeline.explainability.fingerprint_utils import fingerprint_shap_to_atomweights +from molpipeline.mol2any import MolToMorganFP +from molpipeline.utils.subpipeline import SubpipelineExtractor, get_model_from_pipeline + + +# pylint: disable=C0103,W0613 +def _to_dense( + feature_matrix: npt.NDArray[Any] | spmatrix, +) -> npt.NDArray[Any]: + """Mitigate feature incompatibility with SHAP objects. + + Parameters + ---------- + feature_matrix : npt.NDArray[Any] | spmatrix + The input features. + + Returns + ------- + Any + The input features in a compatible format. + """ + if issparse(feature_matrix): + return feature_matrix.todense() # type: ignore[union-attr] + return feature_matrix + + +def _convert_to_array(value: Any) -> npt.NDArray[np.float64]: + """Convert a value to a numpy array. + + Parameters + ---------- + value : Any + The value to convert. + + Returns + ------- + npt.NDArray[np.float64] + The value as a numpy array. + """ + if isinstance(value, np.ndarray): + return value + if np.isscalar(value): + return np.array([value]) + raise ValueError("Value is not a scalar or numpy array.") + + +def _get_prediction_function(pipeline: Pipeline | BaseEstimator) -> Any: + """Get the prediction function of a model. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model. + + Returns + ------- + Any + The prediction function. + """ + if hasattr(pipeline, "predict_proba"): + return pipeline.predict_proba + if hasattr(pipeline, "decision_function"): + return pipeline.decision_function + if hasattr(pipeline, "predict"): + return pipeline.predict + raise ValueError("Could not determine the model output predictions") + + +# This function might also be put at a more central position in the lib. +def _get_predictions( + pipeline: Pipeline, feature_matrix: npt.NDArray[Any] | spmatrix +) -> npt.NDArray[np.float64]: + """Get the predictions of a model. + + Raises if no adequate method is found. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model. + feature_matrix : Any + The input data. + + Returns + ------- + npt.NDArray[np.float64] + The predictions. + """ + prediction_function = _get_prediction_function(pipeline) + prediction = prediction_function(feature_matrix) + return np.array(prediction) + + +def _convert_shap_feature_weights_to_atom_weights( + feature_weights: npt.NDArray[np.float64], + molecule: OptionalMol, + featurization_element: MolToMorganFP, + feature_vector: npt.NDArray[np.float64], +) -> npt.NDArray[np.float64]: + """Convert SHAP feature weights to atom weights. + + Parameters + ---------- + feature_weights : npt.NDArray[np.float64] + The feature weights. + molecule : OptionalMol + The molecule. + featurization_element : MolToMorganFP + The featurization element. + feature_vector : npt.NDArray[np.float64] + The feature vector. + + Returns + ------- + npt.NDArray[np.float64] + The atom weights. + """ + if feature_weights.ndim == 1: + # regression case + feature_weights_present_bits_only = feature_weights.copy() + elif feature_weights.ndim == 2: + # binary classification case. Take the weights for the positive class. + feature_weights_present_bits_only = feature_weights[:, 1].copy() + else: + raise ValueError( + "Unsupported number of dimensions for feature weights. Expected 1 or 2." + ) + + # reset shap values for bits that are not present in the molecule + feature_weights_present_bits_only[feature_vector == 0] = 0 + + atom_weights = np.array( + fingerprint_shap_to_atomweights( + molecule, + featurization_element, + feature_weights_present_bits_only, + ) + ) + return atom_weights + + +_SHAPExplainer_return_type_: TypeAlias = list[ + SHAPFeatureExplanation | SHAPFeatureAndAtomExplanation +] + + +# pylint: disable=R0903 +class AbstractSHAPExplainer(abc.ABC): + """Abstract class for SHAP explainer objects.""" + + # pylint: disable=C0103,W0613 + @abc.abstractmethod + def explain(self, X: Any, **kwargs: Any) -> _SHAPExplainer_return_type_: + """Explain the predictions for the input data. + + Parameters + ---------- + X : Any + The input data to explain. + kwargs : Any + Additional keyword arguments. + + Returns + ------- + list[SHAPFeatureExplanation | SHAPFeatureAndAtomExplanation] + List of explanations corresponding to the input samples. + """ + + +# pylint: disable=R0903 +class SHAPExplainerAdapter(AbstractSHAPExplainer, abc.ABC): + """Adapter for SHAP explainer wrappers for handling molecules and pipelines.""" + + def __init__( + self, + pipeline: Pipeline, + explainer: shap.TreeExplainer | shap.KernelExplainer, + **kwargs: Any, + ) -> None: + """Initialize the SHAPTreeExplainer. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model to explain. + explainer : shap.TreeExplainer | shap.KernelExplainer + The shap explainer object. + kwargs : Any + Additional keyword arguments for SHAP's TreeExplainer. + """ + self.pipeline = pipeline + self.explainer = explainer + + pipeline_extractor = SubpipelineExtractor(self.pipeline) + + # extract the molecule reader subpipeline + self.molecule_reader_subpipeline = ( + pipeline_extractor.get_molecule_reader_subpipeline() + ) + if self.molecule_reader_subpipeline is None: + raise ValueError("Could not determine the molecule reader subpipeline.") + + # extract the featurization subpipeline + self.featurization_subpipeline = ( + pipeline_extractor.get_featurization_subpipeline() + ) + if self.featurization_subpipeline is None: + raise ValueError("Could not determine the featurization subpipeline.") + + # determine type of returned explanation + featurization_element = self.featurization_subpipeline.steps[-1][1] # type: ignore[union-attr] + self.return_element_type_: type[ + SHAPFeatureExplanation | SHAPFeatureAndAtomExplanation + ] + if isinstance(featurization_element, MolToMorganFP): + self.return_element_type_ = SHAPFeatureAndAtomExplanation + else: + self.return_element_type_ = SHAPFeatureExplanation + + @staticmethod + def _prediction_is_valid(prediction: Any) -> bool: + """Check if the prediction is valid using some heuristics. + + Can be used to catch inputs that failed the pipeline for some reason. + + Parameters + ---------- + prediction : Any + The prediction. + Returns + ------- + bool + Whether the prediction is valid. + """ + # if no prediction could be obtained (length is 0); the prediction guaranteed failed. + if len(prediction) == 0: + return False + + # use pandas.isna function to check for invalid predictions, e.g. None, np.nan, + # pd.NA. Note that fill values like 0 will be considered as valid predictions. + if pd.isna(prediction).any(): + return False + + return True + + # pylint: disable=C0103,W0613 + @override + def explain(self, X: Any, **kwargs: Any) -> _SHAPExplainer_return_type_: + """Explain the predictions for the input data. + + If the calculation of the SHAP values for an input sample fails, the explanation will be invalid. + This can be checked with the Explanation.is_valid() method. + + Parameters + ---------- + X : Any + The input data to explain. + kwargs : Any + Additional keyword arguments for SHAP's TreeExplainer.shap_values. + + Returns + ------- + list[SHAPFeatureExplanation | SHAPFeatureAndAtomExplanation] + List of explanations corresponding to the input data. + """ + featurization_element = self.featurization_subpipeline.steps[-1][1] # type: ignore[union-attr] + + explanation_results: _SHAPExplainer_return_type_ = [] + for input_sample in X: + + input_sample = [input_sample] + + # get predictions + prediction = _get_predictions(self.pipeline, input_sample) + if not self._prediction_is_valid(prediction): + # we use the prediction to check if the input is valid. If not, we cannot explain it. + explanation_results.append(self.return_element_type_()) + continue + + if prediction.ndim > 1: + prediction = prediction.squeeze() + + # get the molecule + molecule = self.molecule_reader_subpipeline.transform(input_sample)[0] # type: ignore[union-attr] + + # get feature vectors + feature_vector = self.featurization_subpipeline.transform(input_sample) # type: ignore[union-attr] + feature_vector = _to_dense(feature_vector) + feature_vector = np.asarray(feature_vector).squeeze() + + if feature_vector.size == 0: + # if the feature vector is empty, we cannot explain the prediction. + # This happens for failed instances in pipeline with fill values + # that could be valid predictions, like 0. + explanation_results.append(self.return_element_type_()) + continue + + # compute the shap values for the features + feature_weights = self.explainer.shap_values(feature_vector, **kwargs) + feature_weights = np.asarray(feature_weights).squeeze() + + atom_weights = None + bond_weights = None + + if issubclass( + self.return_element_type_, AtomExplanationMixin + ) and isinstance(featurization_element, MolToMorganFP): + # for Morgan fingerprint, we can map the shap values to atom weights + atom_weights = _convert_shap_feature_weights_to_atom_weights( + feature_weights, + molecule, + featurization_element, + feature_vector, + ) + + # gather all input data for the explanation type to be returned + explanation_data = { + "molecule": molecule, + "prediction": prediction, + } + if issubclass(self.return_element_type_, FeatureInfoMixin): + explanation_data["feature_vector"] = feature_vector + if not hasattr(featurization_element, "feature_names"): + raise ValueError( + "Featurization element does not have a get_feature_names method." + ) + explanation_data["feature_names"] = featurization_element.feature_names # type: ignore[union-attr] + + if issubclass(self.return_element_type_, FeatureExplanationMixin): + explanation_data["feature_weights"] = feature_weights + if issubclass(self.return_element_type_, AtomExplanationMixin): + explanation_data["atom_weights"] = atom_weights + if issubclass(self.return_element_type_, BondExplanationMixin): + explanation_data["bond_weights"] = bond_weights + if issubclass(self.return_element_type_, SHAPExplanationMixin): + explanation_data["expected_value"] = _convert_to_array( + self.explainer.expected_value + ) + + explanation_results.append(self.return_element_type_(**explanation_data)) + + return explanation_results + + +class SHAPTreeExplainer(SHAPExplainerAdapter): + """Wrapper for SHAP's TreeExplainer that can handle pipelines and molecules. + + Wraps SHAP's TreeExplainer to explain predictions of a pipeline containing a + tree-based model. + + Note on failed instances: + SHAPTreeExplainer will automatically handle fill values for failed instances and + returns an invalid explanation for them. However, fill values that could be valid + predictions, e.g. 0, are not necessarily detected. Set the fill value to np.nan or + None if these failed instances should not be explained. + """ + + def __init__( + self, + pipeline: Pipeline, + **kwargs: Any, + ) -> None: + """Initialize the SHAPKernelExplainer. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model to explain. + kwargs : Any + Additional keyword arguments for SHAP's Explainer. + """ + explainer = self._create_explainer(pipeline, **kwargs) + super().__init__(pipeline, explainer, **kwargs) + + @staticmethod + def _create_explainer(pipeline: Pipeline, **kwargs: Any) -> shap.TreeExplainer: + """Create the TreeExplainer object from shap. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model to explain. + kwargs : Any + Additional keyword arguments for the explainer. + + Returns + ------- + shap.TreeExplainer + The explainer object. + """ + model = get_model_from_pipeline(pipeline, raise_not_found=True) + explainer = shap.TreeExplainer( + model, + **kwargs, + ) + return explainer + + +class SHAPKernelExplainer(SHAPExplainerAdapter): + """Wrapper for SHAP's KernelExplainer that can handle pipelines and molecules.""" + + def __init__( + self, + pipeline: Pipeline, + **kwargs: Any, + ) -> None: + """Initialize the SHAPKernelExplainer. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model to explain. + kwargs : Any + Additional keyword arguments for SHAP's Explainer. + """ + explainer = self._create_explainer(pipeline, **kwargs) + super().__init__(pipeline, explainer, **kwargs) + + @staticmethod + def _create_explainer(pipeline: Pipeline, **kwargs: Any) -> shap.KernelExplainer: + """Create the explainer object. + + Parameters + ---------- + pipeline : Pipeline + The pipeline containing the model to explain. + kwargs : Any + Additional keyword arguments for the explainer. + + Returns + ------- + shap.KernelExplainer + The explainer object. + """ + model = get_model_from_pipeline(pipeline, raise_not_found=True) + prediction_function = _get_prediction_function(model) + explainer = shap.KernelExplainer( + prediction_function, + **kwargs, + ) + return explainer diff --git a/molpipeline/explainability/explanation.py b/molpipeline/explainability/explanation.py new file mode 100644 index 00000000..a945d2bf --- /dev/null +++ b/molpipeline/explainability/explanation.py @@ -0,0 +1,116 @@ +"""Module for explanation class.""" + +from __future__ import annotations + +import abc +import dataclasses + +import numpy as np +import numpy.typing as npt + +from molpipeline.abstract_pipeline_elements.core import RDKitMol + + +@dataclasses.dataclass(kw_only=True) +class _AbstractMoleculeExplanation(abc.ABC): + """Abstract class representing an explanation for a prediction for a molecule.""" + + molecule: RDKitMol | None = None + prediction: npt.NDArray[np.float64] | None = None + + +@dataclasses.dataclass(kw_only=True) +class FeatureInfoMixin: + """Mixin providing additional information about the features used in the explanation.""" + + feature_vector: npt.NDArray[np.float64] | None = None + feature_names: list[str] | None = None + + +@dataclasses.dataclass(kw_only=True) +class FeatureExplanationMixin: + """Explanation based on feature importance scores, e.g. Shapley Values.""" + + # explanation scores for individual features + feature_weights: npt.NDArray[np.float64] | None = None + + +@dataclasses.dataclass(kw_only=True) +class AtomExplanationMixin: + """Atom score based explanation.""" + + # explanation scores for individual atoms + atom_weights: npt.NDArray[np.float64] | None = None + + +@dataclasses.dataclass(kw_only=True) +class BondExplanationMixin: + """Bond score based explanation.""" + + # explanation scores for individual bonds + bond_weights: npt.NDArray[np.float64] | None = None + + +@dataclasses.dataclass(kw_only=True) +class SHAPExplanationMixin: + """Mixin providing additional information only present in SHAP explanations.""" + + expected_value: npt.NDArray[np.float64] | None = None + + +@dataclasses.dataclass(kw_only=True) +class SHAPFeatureExplanation( + FeatureInfoMixin, + FeatureExplanationMixin, + SHAPExplanationMixin, + _AbstractMoleculeExplanation, # base-class should be the last element https://www.ianlewis.org/en/mixins-and-python +): + """Explanation using feature importance scores from SHAP.""" + + def is_valid(self) -> bool: + """Check if the explanation is valid. + + Returns + ------- + bool + True if the explanation is valid, False otherwise. + """ + return all( + [ + self.feature_vector is not None, + self.feature_names is not None, + self.molecule is not None, + self.prediction is not None, + self.feature_weights is not None, + ] + ) + + +@dataclasses.dataclass(kw_only=True) +class SHAPFeatureAndAtomExplanation( + FeatureInfoMixin, + FeatureExplanationMixin, + SHAPExplanationMixin, + AtomExplanationMixin, + _AbstractMoleculeExplanation, +): + """Explanation using feature and atom importance scores from SHAP.""" + + def is_valid(self) -> bool: + """Check if the explanation is valid. + + Returns + ------- + bool + True if the explanation is valid, False otherwise. + """ + return all( + [ + self.feature_vector is not None, + self.feature_names is not None, + self.molecule is not None, + self.prediction is not None, + self.feature_weights is not None, + self.atom_weights is not None, + ] + ) diff --git a/molpipeline/explainability/fingerprint_utils.py b/molpipeline/explainability/fingerprint_utils.py new file mode 100644 index 00000000..b10dd141 --- /dev/null +++ b/molpipeline/explainability/fingerprint_utils.py @@ -0,0 +1,83 @@ +"""Utility functions for explainability.""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Sequence + +import numpy as np +import numpy.typing as npt + +from molpipeline.abstract_pipeline_elements.core import RDKitMol +from molpipeline.mol2any import MolToMorganFP +from molpipeline.utils.substructure_handling import AtomEnvironment + + +def assign_prediction_importance( + bit_dict: dict[int, Sequence[AtomEnvironment]], weights: npt.NDArray[np.float64] +) -> dict[int, float]: + """Assign the prediction importance. + + Originally from Christian W. Feldmann + https://github.com/c-feldmann/compchemkit/blob/64e5543e2b8f72e93711186b2e0b42366820fb52/compchemkit/molecular_heatmaps.py#L28 + + Parameters + ---------- + bit_dict : dict[int, Sequence[AtomEnvironment]] + The bit dictionary. + weights : npt.NDArray[np.float64] + The weights. + + Returns + ------- + dict[int, float] + The atom contribution. + """ + atom_contribution: dict[int, float] = defaultdict(lambda: 0) + for bit, atom_env_list in bit_dict.items(): # type: int, Sequence[AtomEnvironment] + n_machtes = len(atom_env_list) + for atom_set in atom_env_list: + for atom in atom_set.environment_atoms: + atom_contribution[atom] += weights[bit] / ( + len(atom_set.environment_atoms) * n_machtes + ) + if not np.isclose(sum(weights), sum(atom_contribution.values())).all(): + raise AssertionError( + f"Weights and atom contributions don't sum to the same value:" + f" {weights.sum()} != {sum(atom_contribution.values())}" + ) + return atom_contribution + + +def fingerprint_shap_to_atomweights( + mol: RDKitMol, fingerprint_element: MolToMorganFP, shap_mat: npt.NDArray[np.float64] +) -> list[float]: + """Convert SHAP values to atom weights. + + Originally from Christian W. Feldmann + https://github.com/c-feldmann/compchemkit/blob/64e5543e2b8f72e93711186b2e0b42366820fb52/compchemkit/molecular_heatmaps.py#L15 + + Parameters + ---------- + mol : RDKitMol + The molecule. + fingerprint_element : MolToMorganFP + The fingerprint element. + shap_mat : npt.NDArray[np.float64] + The SHAP values. + + Returns + ------- + list[float] + The atom weights. + """ + bit_atom_env_dict: dict[int, Sequence[AtomEnvironment]] + bit_atom_env_dict = dict( + fingerprint_element.bit2atom_mapping(mol) + ) # MyPy invariants make me do this. + atom_weight_dict = assign_prediction_importance(bit_atom_env_dict, shap_mat) + atom_weight_list = [ + atom_weight_dict[a_idx] if a_idx in atom_weight_dict else 0 + for a_idx in range(mol.GetNumAtoms()) + ] + return atom_weight_list diff --git a/molpipeline/explainability/visualization/__init__.py b/molpipeline/explainability/visualization/__init__.py new file mode 100644 index 00000000..7fbc38ac --- /dev/null +++ b/molpipeline/explainability/visualization/__init__.py @@ -0,0 +1 @@ +"""Visualization module for explainability.""" diff --git a/molpipeline/explainability/visualization/gauss.py b/molpipeline/explainability/visualization/gauss.py new file mode 100644 index 00000000..4652f273 --- /dev/null +++ b/molpipeline/explainability/visualization/gauss.py @@ -0,0 +1,72 @@ +"""Gaussian functions for visualization. + +Much of the visualization code in this file originates from projects of Christian W. Feldmann: + https://github.com/c-feldmann/rdkit_heatmaps + https://github.com/c-feldmann/compchemkit +""" + +import numpy as np +import numpy.typing as npt + + +# pylint: disable=too-few-public-methods +class GaussFunctor2D: + """2D Gaussian functor.""" + + def __init__( + self, + center: npt.NDArray[np.float64], + std1: float = 1, + std2: float = 1, + scale: float = 1, + rotation: float = 0, + ) -> None: + """Initialize 2D Gaussian functor. + + Parameters + ---------- + center: npt.NDArray[np.float64] + Center of the Gaussian function. + std1: float + Standard deviation along the first axis. + std2: float + Standard deviation along the second axis. + scale: float + Scaling factor. + rotation: float + Rotation angle in radians. + """ + self.center = center + self.std = np.array([std1, std2]) ** 2 # scale stds to variance + self.scale = scale + self.rotation = rotation + + self._a = np.cos(self.rotation) ** 2 / (2 * self.std[0]) + np.sin( + self.rotation + ) ** 2 / (2 * self.std[1]) + self._b = -np.sin(2 * self.rotation) / (4 * self.std[0]) + np.sin( + 2 * self.rotation + ) / (4 * self.std[1]) + self._c = np.sin(self.rotation) ** 2 / (2 * self.std[0]) + np.cos( + self.rotation + ) ** 2 / (2 * self.std[1]) + + def __call__(self, pos: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + """Evaluate the Gaussian function at the given positions. + + Parameters + ---------- + pos: npt.NDArray[np.float64] + Array of positions to evaluate the Gaussian function at. + + Returns + ------- + npt.NDArray[np.float64] + Array of function values at the given positions. + """ + exponent = self._a * (pos[:, 0] - self.center[0]) ** 2 + exponent += ( + 2 * self._b * (pos[:, 0] - self.center[0]) * (pos[:, 1] - self.center[1]) + ) + exponent += self._c * (pos[:, 1] - self.center[1]) ** 2 + return self.scale * np.exp(-exponent) diff --git a/molpipeline/explainability/visualization/heatmaps.py b/molpipeline/explainability/visualization/heatmaps.py new file mode 100644 index 00000000..4fdae123 --- /dev/null +++ b/molpipeline/explainability/visualization/heatmaps.py @@ -0,0 +1,276 @@ +"""Module for generating heatmaps from 2D-grids. + +Much of the visualization code in this file originates from projects of Christian W. Feldmann: + https://github.com/c-feldmann/rdkit_heatmaps + https://github.com/c-feldmann/compchemkit +""" + +import abc +from typing import Callable, Sequence + +import numpy as np +import numpy.typing as npt +from matplotlib import colors +from rdkit.Chem import Draw +from rdkit.Geometry.rdGeometry import Point2D + + +class Grid2D(abc.ABC): + """Metaclass for discrete 2-dimensional grids. + + This class holds a matrix of values accessed by index, where each cell is associated with a specific location. + """ + + def __init__( + self, + x_lim: Sequence[float], + y_lim: Sequence[float], + x_res: int, + y_res: int, + ) -> None: + """Initialize the Grid2D with limits and resolution of the axes. + + Parameters + ---------- + x_lim: Sequence[float] + Extend of the grid along the x-axis (xmin, xmax). + y_lim: Sequence[float] + Extend of the grid along the y-axis (ymin, ymax). + x_res: int + Resolution (number of cells) along x-axis. + y_res: int + Resolution (number of cells) along y-axis. + """ + if len(x_lim) != 2: + raise ValueError("x_lim must be of length 2.") + if len(y_lim) != 2: + raise ValueError("y_lim must be of length 2.") + + self.x_lim = x_lim + self.y_lim = y_lim + self.x_res = x_res + self.y_res = y_res + self.values = np.zeros((self.x_res, self.y_res)) + + @property + def dx(self) -> float: + """Length of cell in x-direction.""" + return (max(self.x_lim) - min(self.x_lim)) / self.x_res + + @property + def dy(self) -> float: + """Length of cell in y-direction.""" + return (max(self.y_lim) - min(self.y_lim)) / self.y_res + + def grid_field_center(self, x_idx: int, y_idx: int) -> tuple[float, float]: + """Center of cell specified by index along x and y. + + Parameters + ---------- + x_idx: int + cell-index along x-axis. + y_idx: int + cell-index along y-axis. + + Returns + ------- + tuple[float, float] + Coordinates of center of cell. + """ + x_coord = min(self.x_lim) + self.dx * (x_idx + 0.5) + y_coord = min(self.y_lim) + self.dy * (y_idx + 0.5) + return x_coord, y_coord + + def grid_field_lim( + self, x_idx: int, y_idx: int + ) -> tuple[tuple[float, float], tuple[float, float]]: + """Get x and y coordinates for the upper left and lower right position of specified pixel. + + Parameters + ---------- + x_idx: int + cell-index along x-axis. + y_idx: int + cell-index along y-axis. + + Returns + ------- + tuple[tuple[float, float], tuple[float, float]] + Coordinates of upper left and lower right corner of cell. + """ + upper_left = ( + min(self.x_lim) + self.dx * x_idx, + min(self.y_lim) + self.dy * y_idx, + ) + lower_right = ( + min(self.x_lim) + self.dx * (x_idx + 1), + min(self.y_lim) + self.dy * (y_idx + 1), + ) + return upper_left, lower_right + + +class ColorGrid(Grid2D): + """Stores rgba-values of cells.""" + + def __init__( + self, + x_lim: Sequence[float], + y_lim: Sequence[float], + x_res: int, + y_res: int, + ): + """Initialize the ColorGrid with limits and resolution of the axes. + + Parameters + ---------- + x_lim: Sequence[float] + Extend of the grid along the x-axis (xmin, xmax). + y_lim: Sequence[float] + Extend of the grid along the y-axis (ymin, ymax). + x_res: int + Resolution (number of cells) along x-axis. + y_res: int + Resolution (number of cells) along y-axis. + """ + super().__init__(x_lim, y_lim, x_res, y_res) + self.color_grid = np.ones((self.x_res, self.y_res, 4)) + + +class ValueGrid(Grid2D): + """Calculate and store values of cells. + + Evaluates all added functions for the position of each cell and calculates the value of each cell as sum of these + functions. + """ + + def __init__( + self, + x_lim: Sequence[float], + y_lim: Sequence[float], + x_res: int, + y_res: int, + ): + """Initialize the ValueGrid with limits and resolution of the axes. + + Parameters + ---------- + x_lim: Sequence[float] + Extend of the grid along the x-axis (xmin, xmax). + y_lim: Sequence[float] + Extend of the grid along the y-axis (ymin, ymax). + x_res: int + Resolution (number of cells) along x-axis. + y_res: int + Resolution (number of cells) along y-axis. + """ + super().__init__(x_lim, y_lim, x_res, y_res) + self.function_list: list[ + Callable[[npt.NDArray[np.float64]], npt.NDArray[np.float64]] + ] = [] + self.values = np.zeros((self.x_res, self.y_res)) + + def add_function( + self, function: Callable[[npt.NDArray[np.float64]], npt.NDArray[np.float64]] + ) -> None: + """Add a function to the grid which is evaluated for each cell, when `self.evaluate` is called. + + Parameters + ---------- + function: Callable[[npt.NDArray[np.float64]], npt.NDArray[np.float64]] + Function to be evaluated for each cell. The function should take an array of positions and return an array + of values, e.g. a Gaussian function. + """ + self.function_list.append(function) + + def evaluate(self) -> None: + """Evaluate each function for each cell. Values of cells are calculated as the sum of all function-values. + + The results are saved to `self.values`. + """ + self.values = np.zeros((self.x_res, self.y_res)) + x_y0_list = np.array( + [self.grid_field_center(x, 0)[0] for x in range(self.x_res)] + ) + x0_y_list = np.array( + [self.grid_field_center(0, y)[1] for y in range(self.y_res)] + ) + xv, yv = np.meshgrid(x_y0_list, x0_y_list) + xv = xv.ravel() + yv = yv.ravel() + coordinate_pairs = np.vstack([xv, yv]).T + for f in self.function_list: + values = f(coordinate_pairs) + values = values.reshape(self.y_res, self.x_res).T + if values.shape != self.values.shape: + raise AssertionError( + f"Function does not return correct shape. Shape was {(values.shape, self.values.shape)}" + ) + self.values += values + + def map2color( + self, + c_map: colors.Colormap, + normalizer: colors.Normalize, + ) -> ColorGrid: + """Generate a ColorGrid from `self.values` according to given colormap. + + Parameters + ---------- + c_map: colors.Colormap + Colormap to be used for mapping values to colors. + normalizer: colors.Normalize + Normalizer to be used for mapping values to colors. + + Returns + ------- + ColorGrid + ColorGrid with colors corresponding to ValueGrid. + """ + color_grid = ColorGrid(self.x_lim, self.y_lim, self.x_res, self.y_res) + norm = normalizer(self.values) + color_grid.color_grid = np.array(c_map(norm)) + return color_grid + + +def get_color_normalizer_from_data( + values: npt.NDArray[np.float64], +) -> colors.Normalize: + """Create a color normalizer based on the data distribution of 'values'. + + Parameters + ---------- + values: npt.NDArray[np.float64] + Data to derive limits of normalizer. The maximum absolute value of + values` is used as limit. + + Returns + ------- + colors.Normalize + Normalizer for colors. + """ + abs_max = np.max(np.abs(values)) + normalizer = colors.Normalize(vmin=-abs_max, vmax=abs_max) + return normalizer + + +def color_canvas(canvas: Draw.MolDraw2D, color_grid: ColorGrid) -> None: + """Draw a ColorGrid object to a RDKit Draw.MolDraw2D canvas. + + Each pixel is drawn as rectangle, so if you use Draw.MolDrawSVG brace yourself and your RAM! + + Parameters + ---------- + canvas: Draw.MolDraw2D + RDKit Draw.MolDraw2D canvas. + color_grid: ColorGrid + ColorGrid object to be drawn on the canvas. + """ + # draw only grid points whose color is not white. + # we check for the exact values of white (1,1,1). np.isclose returns almost the same pixels but is slightly slower. + mask = np.where(~np.all(color_grid.color_grid[:, :, :3] == [1, 1, 1], axis=2)) + + for x, y in zip(*mask): + upper_left, lower_right = color_grid.grid_field_lim(x, y) + upper_left, lower_right = Point2D(*upper_left), Point2D(*lower_right) + canvas.SetColour(tuple(color_grid.color_grid[x, y])) + canvas.DrawRect(upper_left, lower_right) diff --git a/molpipeline/explainability/visualization/utils.py b/molpipeline/explainability/visualization/utils.py new file mode 100644 index 00000000..b2f1a72d --- /dev/null +++ b/molpipeline/explainability/visualization/utils.py @@ -0,0 +1,173 @@ +"""Utility functions for visualization of molecules and their explanations.""" + +import io +from typing import Sequence + +import numpy as np +import numpy.typing as npt +from matplotlib import pyplot as plt +from matplotlib.colors import Colormap, ListedColormap +from matplotlib.pyplot import get_cmap +from PIL import Image +from rdkit import Chem + +# red green blue alpha tuple +RGBAtuple = tuple[float, float, float, float] + + +def get_mol_lims(mol: Chem.Mol) -> tuple[tuple[float, float], tuple[float, float]]: + """Return the extent of the molecule. + + x- and y-coordinates of all atoms in the molecule are accessed, returning min- and max-values for both axes. + + Parameters + ---------- + mol: Chem.Mol + RDKit Molecule object of which the limits are determined. + + Returns + ------- + tuple[tuple[float, float], tuple[float, float]] + Limits of the molecule. + """ + coords_list = [] + conf = mol.GetConformer(0) + for i, _ in enumerate(mol.GetAtoms()): + pos = conf.GetAtomPosition(i) + coords_list.append((pos.x, pos.y)) + coords: npt.NDArray[np.float64] = np.array(coords_list) + min_p = np.min(coords, axis=0) + max_p = np.max(coords, axis=0) + x_lim = min_p[0], max_p[0] + y_lim = min_p[1], max_p[1] + return x_lim, y_lim + + +def pad( + lim: Sequence[float] | npt.NDArray[np.float64], ratio: float +) -> tuple[float, float]: + """Take a 2-dimensional vector and adds len(vector) * ratio / 2 to each side and returns obtained vector. + + Parameters + ---------- + lim: Sequence[float] | npt.NDArray[np.float64] + Limits which are extended. + ratio: float + factor by which the limits are extended. + + Returns + ------- + List[float, float] + Extended limits + """ + diff = max(lim) - min(lim) + diff *= ratio / 2 + return lim[0] - diff, lim[1] + diff + + +def get_color_map_from_input( + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None +) -> Colormap: + """Get a colormap from a user defined color scheme. + + Parameters + ---------- + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None + The color scheme. + + Returns + ------- + Colormap + The colormap. + """ + # read user definer color scheme as ColorMap + if color is None: + coolwarm = ( + (0.017, 0.50, 0.850, 0.5), + (1.0, 1.0, 1.0, 0.5), + (1.0, 0.25, 0.0, 0.5), + ) + coolwarm = (coolwarm[2], coolwarm[1], coolwarm[0]) + color = coolwarm + if isinstance(color, Colormap): + color_map = color + elif isinstance(color, tuple): + color_map = color_tuple_to_colormap(color) # type: ignore + elif isinstance(color, str): + color_map = get_cmap(color) + else: + raise ValueError("Color must be a tuple, string or ColorMap.") + return color_map + + +def color_tuple_to_colormap( + color_tuple: tuple[RGBAtuple, RGBAtuple, RGBAtuple] +) -> Colormap: + """Convert a color tuple to a colormap. + + Parameters + ---------- + color_tuple: tuple[RGBAtuple, RGBAtuple, RGBAtuple] + The color tuple. + + Returns + ------- + Colormap + The colormap (a matplotlib data structure). + """ + if len(color_tuple) != 3: + raise ValueError("Color tuple must have 3 elements") + + # Definition of color + col1, col2, col3 = map(np.array, color_tuple) + + # Creating linear gradient for color mixing + linspace = np.linspace(0, 1, int(128)) + linspace4d = np.vstack([linspace] * 4).T + + # interpolating values for 0 to 0.5 by mixing purple and white + zero_to_half = linspace4d * col2 + (1 - linspace4d) * col3 + # interpolating values for 0.5 to 1 by mixing white and yellow + half_to_one = col1 * linspace4d + col2 * (1 - linspace4d) + + # Creating new colormap from + color_map = ListedColormap(np.vstack([zero_to_half, half_to_one])) + return color_map + + +def to_png(data: bytes) -> Image.Image: + """Show a PNG image from a byte stream. + + Parameters + ---------- + data: bytes + The image data. + + Returns + ------- + Image + The image. + """ + bio = io.BytesIO(data) + img = Image.open(bio) + return img + + +def plt_to_pil(figure: plt.Figure) -> Image.Image: + """Convert a matplotlib figure to a PIL image. + + Parameters + ---------- + figure: plt.Figure + The figure. + + Returns + ------- + Image + The image. + """ + bio = io.BytesIO() + figure.savefig(bio, format="png") + bio.seek(0) + img = Image.open(bio) + return img diff --git a/molpipeline/explainability/visualization/visualization.py b/molpipeline/explainability/visualization/visualization.py new file mode 100644 index 00000000..7a87ed76 --- /dev/null +++ b/molpipeline/explainability/visualization/visualization.py @@ -0,0 +1,494 @@ +"""Visualization functions for the explainability module. + +Much of the visualization code in this file originates from projects of Christian W. Feldmann: + https://github.com/c-feldmann/rdkit_heatmaps + https://github.com/c-feldmann/compchemkit +""" + +from __future__ import annotations + +from typing import Sequence + +import numpy as np +import numpy.typing as npt +from matplotlib import colors +from matplotlib import pyplot as plt +from matplotlib.colors import Colormap +from PIL import Image +from rdkit import Chem +from rdkit.Chem import Draw +from rdkit.Chem.Draw import rdMolDraw2D + +from molpipeline.abstract_pipeline_elements.core import RDKitMol +from molpipeline.explainability.explanation import SHAPFeatureAndAtomExplanation +from molpipeline.explainability.visualization.gauss import GaussFunctor2D +from molpipeline.explainability.visualization.heatmaps import ( + ValueGrid, + color_canvas, + get_color_normalizer_from_data, +) +from molpipeline.explainability.visualization.utils import ( + RGBAtuple, + get_color_map_from_input, + get_mol_lims, + pad, + plt_to_pil, + to_png, +) + + +def _make_grid_from_mol( + mol: Chem.Mol, + grid_resolution: Sequence[int], + padding: Sequence[float], +) -> ValueGrid: + """Create a grid for the molecule. + + Parameters + ---------- + mol: Chem.Mol + RDKit molecule object. + grid_resolution: Sequence[int] + Resolution of the grid. + padding: Sequence[float] + Padding of the grid. + + Returns + ------- + ValueGrid + ValueGrid object. + """ + xl: list[float] + yl: list[float] + xl, yl = [list(lim) for lim in get_mol_lims(mol)] # Limit of molecule + + # Extent of the canvas is approximated by size of molecule scaled by ratio of canvas height and width. + # Would be nice if this was directly accessible... + mol_height = yl[1] - yl[0] + mol_width = xl[1] - xl[0] + + height_to_width_ratio_mol = mol_height / (1e-16 + mol_width) + # the grids height / weight is the canvas height / width + height_to_width_ratio_canvas = grid_resolution[1] / grid_resolution[0] + + if height_to_width_ratio_mol < height_to_width_ratio_canvas: + mol_height_new = height_to_width_ratio_canvas * mol_width + yl[0] -= (mol_height_new - mol_height) / 2 + yl[1] += (mol_height_new - mol_height) / 2 + else: + mol_width_new = grid_resolution[0] / grid_resolution[1] * mol_height + xl[0] -= (mol_width_new - mol_width) / 2 + xl[1] += (mol_width_new - mol_width) / 2 + + xl = list(pad(xl, padding[0])) # Increasing size of x-axis + yl = list(pad(yl, padding[1])) # Increasing size of y-axis + v_map = ValueGrid(xl, yl, grid_resolution[0], grid_resolution[1]) + return v_map + + +def _add_gaussians_for_atoms( + mol: Chem.Mol, + conf: Chem.Conformer, + v_map: ValueGrid, + atom_weights: npt.NDArray[np.float64], + atom_width: float, +) -> ValueGrid: + """Add Gauss-functions centered at atoms to the grid. + + Parameters + ---------- + mol: Chem.Mol + RDKit molecule object. + conf: Chem.Conformer + Conformation of the molecule. + v_map: ValueGrid + ValueGrid object to which the functions are added. + atom_weights: npt.NDArray[np.float64] + Array of weights for atoms. + atom_width: float + Width of the displayed atom weights. + + Returns + ------- + ValueGrid + ValueGrid object with added functions. + """ + for i in range(mol.GetNumAtoms()): + if atom_weights[i] == 0: + continue + pos = conf.GetAtomPosition(i) + coords = np.array([pos.x, pos.y]) + func = GaussFunctor2D( + center=coords, + std1=atom_width, + std2=atom_width, + scale=atom_weights[i], + rotation=0, + ) + v_map.add_function(func) + return v_map + + +# pylint: disable=too-many-locals +def _add_gaussians_for_bonds( + mol: Chem.Mol, + conf: Chem.Conformer, + v_map: ValueGrid, + bond_weights: npt.NDArray[np.float64], + bond_width: float, + bond_length: float, +) -> ValueGrid: + """Add Gauss-functions centered at bonds to the grid. + + Parameters + ---------- + mol: Chem.Mol + RDKit molecule object. + conf: Chem.Conformer + Conformation of the molecule. + v_map: ValueGrid + ValueGrid object to which the functions are added. + bond_weights: npt.NDArray[np.float64] + Array of weights for bonds. + bond_width: float + Width of the displayed bond weights (perpendicular to bond-axis). + bond_length: float + Length of the displayed bond weights (along the bond-axis). + + Returns + ------- + ValueGrid + ValueGrid object with added functions. + """ + # Adding Gauss-functions centered at bonds (position between the two bonded-atoms) + for i, b in enumerate(mol.GetBonds()): + if bond_weights[i] == 0: + continue + a1 = b.GetBeginAtom().GetIdx() + a1_pos = conf.GetAtomPosition(a1) + a1_coords = np.array([a1_pos.x, a1_pos.y]) + + a2 = b.GetEndAtom().GetIdx() + a2_pos = conf.GetAtomPosition(a2) + a2_coords = np.array([a2_pos.x, a2_pos.y]) + + diff = a2_coords - a1_coords + angle = np.arctan2(diff[0], diff[1]) + + bond_center = (a1_coords + a2_coords) / 2 + + func = GaussFunctor2D( + center=bond_center, + std1=bond_width, + std2=bond_length, + scale=bond_weights[i], + rotation=angle, + ) + v_map.add_function(func) + return v_map + + +def make_sum_of_gaussians_grid( + mol: Chem.Mol, + grid_resolution: Sequence[int], + padding: Sequence[float], + atom_weights: Sequence[float] | npt.NDArray[np.float64] | None = None, + bond_weights: Sequence[float] | npt.NDArray[np.float64] | None = None, + atom_width: float = 0.3, + bond_width: float = 0.25, + bond_length: float = 0.5, +) -> rdMolDraw2D: + """Map weights of atoms and bonds to the drawing of a RDKit molecular depiction. + + For each atom and bond of depicted molecule a Gauss-function, centered at the respective object, is created and + scaled by the corresponding weight. Gauss-functions of atoms are circular, while Gauss-functions of bonds can be + distorted along the bond axis. The value of each pixel is determined as the sum of all function-values at the pixel + position. Subsequently, the values are mapped to a color and drawn onto the canvas. + + Inspired from https://github.com/c-feldmann/rdkit_heatmaps/blob/master/rdkit_heatmaps/molmapping.py + + Parameters + ---------- + mol: Chem.Mol + RDKit molecule object which is displayed. + grid_resolution: Sequence[int] + Number of pixels of x- and y-axis. + padding: Sequence[float] + Increase of heatmap size, relative to size of molecule. + atom_weights: Sequence[float] | npt.NDArray[np.float64] | None + Array of weights for atoms. + bond_weights: Sequence[float] | npt.NDArray[np.float64] | None + Array of weights for bonds. + atom_width: float + Value for the width of displayed atom weights. + bond_width: float + Value for the width of displayed bond weights (perpendicular to bond-axis). + bond_length: float + Value for the length of displayed bond weights (along the bond-axis). + + Returns + ------- + rdMolDraw2D.MolDraw2D + Drawing of molecule and corresponding heatmap. + """ + # assign default values and convert to numpy array + if atom_weights is None: + atom_weights = np.zeros(mol.GetNumAtoms()) + elif not isinstance(atom_weights, np.ndarray): + atom_weights = np.array(atom_weights) + + if bond_weights is None: + bond_weights = np.zeros(len(mol.GetBonds())) + elif not isinstance(bond_weights, np.ndarray): + bond_weights = np.array(bond_weights) + + # validate input + if not len(atom_weights) == mol.GetNumAtoms(): + raise ValueError("len(atom_weights) is not equal to number of atoms in mol") + + if not len(bond_weights) == len(mol.GetBonds()): + raise ValueError("len(bond_weights) is not equal to number of bonds in mol") + + # extract the 2D conformation of the molecule to be drawn + conf = mol.GetConformer(0) + + # setup grid and add functions for atoms and bonds + value_grid = _make_grid_from_mol(mol, grid_resolution, padding) + value_grid = _add_gaussians_for_atoms( + mol, conf, value_grid, atom_weights, atom_width + ) + value_grid = _add_gaussians_for_bonds( + mol, conf, value_grid, bond_weights, bond_width, bond_length + ) + + # evaluate all functions at pixel positions to obtain pixel values + value_grid.evaluate() + + return value_grid + + +def _structure_heatmap( + mol: RDKitMol, + atom_weights: npt.NDArray[np.float64], + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None = None, + width: int = 600, + height: int = 600, + color_limits: tuple[float, float] | None = None, +) -> tuple[Draw.MolDraw2D, ValueGrid, ValueGrid, colors.Normalize, Colormap]: + """Create a heatmap of the molecular structure, highlighting atoms with weighted Gaussian's. + + Parameters + ---------- + mol: RDKitMol + The molecule. + atom_weights: npt.NDArray[np.float64] + The atom weights. + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None + The color map. + width: int + The width of the image in number of pixels. + height: int + The height of the image in number of pixels. + color_limits: tuple[float, float] | None + The color limits. + + Returns + ------- + Draw.MolDraw2D, ValueGrid, ColorGrid, colors.Normalize, Colormap + The configured drawer, the value grid, the color grid, the normalizer, and the + color map. + """ + drawer = Draw.MolDraw2DCairo(width, height) + # Coloring atoms of element 0 to 100 black + drawer.drawOptions().updateAtomPalette({i: (0, 0, 0, 1) for i in range(100)}) + draw_opt = drawer.drawOptions() + draw_opt.padding = 0.2 + + color_map = get_color_map_from_input(color) + + # create the sums of gaussians value grid + mol_copy = Chem.Mol(mol) + mol_copy = Draw.PrepareMolForDrawing(mol_copy) + value_grid = make_sum_of_gaussians_grid( + mol_copy, + atom_weights=atom_weights, + bond_weights=None, + atom_width=0.5, # 0.4 + bond_width=0.25, + bond_length=0.5, + grid_resolution=[drawer.Width(), drawer.Height()], + padding=[draw_opt.padding * 2, draw_opt.padding * 2], + ) + + # create color-grid from the value grid. + if color_limits is None: + normalizer = get_color_normalizer_from_data(value_grid.values) + else: + normalizer = colors.Normalize(vmin=color_limits[0], vmax=color_limits[1]) + color_grid = value_grid.map2color(color_map, normalizer=normalizer) + + # draw the molecule and erase it to initialize the grid + drawer.DrawMolecule(mol) + drawer.ClearDrawing() + # add the Colormap to the canvas + color_canvas(drawer, color_grid) + # add the molecule to the canvas + drawer.DrawMolecule(mol) + + drawer.FinishDrawing() + return drawer, value_grid, color_grid, normalizer, color_map + + +def structure_heatmap( + mol: RDKitMol, + atom_weights: npt.NDArray[np.float64], + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None = None, + width: int = 600, + height: int = 600, + color_limits: tuple[float, float] | None = None, +) -> Image.Image: + """Create a Gaussian plot on the molecular structure, highlight atoms with weighted Gaussians. + + Parameters + ---------- + mol: RDKitMol + The molecule. + atom_weights: npt.NDArray[np.float64] + The atom weights. + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None + The color map. + width: int + The width of the image in number of pixels. + height: int + The height of the image in number of pixels. + color_limits: tuple[float, float] | None + The color limits. + + Returns + ------- + Image + The image as PNG. + """ + drawer, *_ = _structure_heatmap( + mol, atom_weights, color, width, height, color_limits + ) + figure_bytes = drawer.GetDrawingText() + image = to_png(figure_bytes) + return image + + +def structure_heatmap_shap( # pylint: disable=too-many-branches + explanation: SHAPFeatureAndAtomExplanation, + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None = None, + width: int = 600, + height: int = 600, + color_limits: tuple[float, float] | None = None, +) -> Image.Image: + """Create a heatmap of the molecular structure and display SHAP prediction composition. + + Parameters + ---------- + explanation: SHAPExplanation + The SHAP explanation. + color: str | Colormap | tuple[RGBAtuple, RGBAtuple, RGBAtuple] | None + The color map. + width: int + The width of the image in number of pixels. + height: int + The height of the image in number of pixels. + color_limits: tuple[float, float] | None + The color limits. + + Returns + ------- + Image + The image as PNG. + """ + if explanation.feature_weights is None: + raise ValueError("Explanation does not contain feature weights.") + if explanation.feature_vector is None: + raise ValueError("Explanation does not contain feature_vector.") + if explanation.expected_value is None: + raise ValueError("Explanation does not contain expected value.") + if explanation.prediction is None: + raise ValueError("Explanation does not contain prediction.") + if explanation.molecule is None: + raise ValueError("Explanation does not contain molecule.") + if explanation.atom_weights is None: + raise ValueError("Explanation does not contain atom weights.") + + if explanation.feature_vector.max() > 1 or explanation.feature_vector.min() < 0: + raise ValueError( + "Feature vector must be binary. Alternatively, use the structure_heatmap function instead." + ) + + if explanation.prediction.ndim > 2: + raise ValueError( + "Unsupported shape for prediction. Maximum 2 dimension is supported." + ) + + if explanation.feature_weights.ndim == 1: + feature_weights = explanation.feature_weights + elif explanation.feature_weights.ndim == 2: + feature_weights = explanation.feature_weights[:, 1] + else: + raise ValueError("Unsupported shape for feature weights.") + + # determine present/absent features using the binary feature vector + present_shap = feature_weights * explanation.feature_vector + absent_shap = feature_weights * (1 - explanation.feature_vector) + sum_present_shap = sum(present_shap) + sum_absent_shap = sum(absent_shap) + + with plt.ioff(): + + drawer, _, _, normalizer, color_map = _structure_heatmap( + explanation.molecule, + explanation.atom_weights, + color=color, + width=width, + height=height, + color_limits=color_limits, + ) + figure_bytes = drawer.GetDrawingText() + image_heatmap = to_png(figure_bytes) + image_array = np.array(image_heatmap) + + fig, ax = plt.subplots(figsize=(8, 8)) + + im = ax.imshow( + image_array, + cmap=color_map, + norm=normalizer, + ) + # remove ticks + ax.set_xticks([]) + ax.set_yticks([]) + # remove border + for spine in ax.spines.values(): + spine.set_visible(False) + + fig.colorbar(im, ax=ax, orientation="vertical", fraction=0.015, pad=0.0) + + # note: the prediction/expected value of the last array element is used + text = ( + f"$Prediction = {explanation.prediction[-1]:.2f}$ =" + "\n" + "\n" + f" $expected \ value={explanation.expected_value[-1]:.2f}$ + " # noqa: W605 # pylint: disable=W1401 + f"$features_{{present}}= {sum_present_shap:.2f}$ + " + f"$features_{{absent}}={sum_absent_shap:.2f}$" + ) + fig.text(0.5, 0.18, text, ha="center") + + image = plt_to_pil(fig) + # clear the figure and memory + plt.close(fig) + + # remove dpi info because it crashes ipython's display function + if "dpi" in image.info: + del image.info["dpi"] + # keep RDKit's image info + image.info.update(image_heatmap.info) + + return image diff --git a/molpipeline/utils/subpipeline.py b/molpipeline/utils/subpipeline.py index a55aa1c4..a507fc74 100644 --- a/molpipeline/utils/subpipeline.py +++ b/molpipeline/utils/subpipeline.py @@ -385,3 +385,61 @@ def get_all_filter_reinserter_fill_values(self) -> list[Any]: ): fill_values.add(step.wrapped_estimator.fill_value) return list(fill_values) + + +def get_featurization_subpipeline( + pipeline: Pipeline, raise_not_found: bool = False +) -> Pipeline | None: + """Get the featurization subpipeline from a pipeline. + + Parameters + ---------- + pipeline : Pipeline + The pipeline to extract the featurization subpipeline from. + raise_not_found : bool + If True, raise a ValueError if the model was not found. + + Raises + ------ + ValueError + If the model was not found and raise_not_found is True. + + Returns + ------- + Pipeline | None + The extracted featurization subpipeline or None if the featurization element was not found. + """ + pipeline_extractor = SubpipelineExtractor(pipeline) + featurization_subpipeline = pipeline_extractor.get_featurization_subpipeline() + if raise_not_found and featurization_subpipeline is None: + raise ValueError("Could not determine the featurization subpipeline.") + return featurization_subpipeline + + +def get_model_from_pipeline( + pipeline: Pipeline, raise_not_found: bool = False +) -> BaseEstimator | None: + """Get the model from a pipeline. + + Parameters + ---------- + pipeline : Pipeline + The pipeline to extract the model from. + raise_not_found : bool + If True, raise a ValueError if the model was not found. + + Raises + ------ + ValueError + If the model was not found and raise_not_found is True. + + Returns + ------- + BaseEstimator | None + The extracted model or None if the model was not found. + """ + pipeline_extractor = SubpipelineExtractor(pipeline) + model = pipeline_extractor.get_model_element() + if raise_not_found and model is None: + raise ValueError("Could not determine the model to explain.") + return model diff --git a/notebooks/introduction_to_explainable_ai.ipynb b/notebooks/introduction_to_explainable_ai.ipynb new file mode 100644 index 00000000..52bc190f --- /dev/null +++ b/notebooks/introduction_to_explainable_ai.ipynb @@ -0,0 +1,1327 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "52d2059f-af91-44cb-8606-3797d89a0b76", + "metadata": {}, + "source": [ + "# Introduction to explainable AI (XAI)\n", + "\n", + "MolPipeline supports explainability methods for machine learning models, also called explainable AI (XAI). Explainability methods provide explanations for predictions, which can help to interprete why a model made a prediction. This can help, for example, users in decision making and developers to improve machine learning models. \n", + "\n", + "This notebooks shows how explanations can be easily generated with MolPipeline. We show this with XAI explanations for a simple standard Random Forest with Morgan fingerprints model. This XAI method was introduced by [Feldmann et al. 2022](https://doi.org/10.3390/biom12040557) and uses Shapley Values from [SHAP](https://github.com/shap/shap) to explain important atoms and substructures with a heatmap on the 2D depcition of the molecular structure. For the implementation see MolPipelines `explainability` module.\n", + "\n", + "In addition, we borrow a real-world drug design data set from [Harren et al. 2022](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) and illustrate how explanations generated with MolPipeline can be used in real-world scenarios. The study by Harren et al. is a comprehensive analysis of methods for interpreting structure–activity relationships (SARs) in lead optimization with XAI from Sanofi and the University Hamburg. This notebook shows that explanations with a Random Forest model with fingerprints highlight key substructures important for affinity. However, some of the explanations differ from those by Harren et al. generated with a multilayer perceptron with the same fingerprints. This shows that explanations must always be interpreted in the context of the model and data set and that for a comprehensive interpretation multiple machine learning models should be consolidated. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "24d07cc6-81e1-48d1-b30f-6a1e1f37bf4d", + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from rdkit import Chem\n", + "from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor\n", + "import pandas as pd\n", + "\n", + "from molpipeline import Pipeline\n", + "from molpipeline.any2mol import AutoToMol\n", + "from molpipeline.mol2any import MolToMorganFP\n", + "from molpipeline.explainability import (\n", + " SHAPTreeExplainer,\n", + " structure_heatmap_shap,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b10eefeb-f30f-4e49-a9a7-2b0af5e78199", + "metadata": {}, + "outputs": [], + "source": [ + "RANDOM_STATE = 123456" + ] + }, + { + "cell_type": "markdown", + "id": "0c89a57a-2467-4ab0-941f-1a15ce3921dd", + "metadata": {}, + "source": [ + "## Reading the protein-ligand binding data set" + ] + }, + { + "cell_type": "markdown", + "id": "a640f22c-9568-47a9-9f1d-4601015a5902", + "metadata": {}, + "source": [ + "The data set from [Harren et al.](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) contains bioactivity measurements describing the binding of small molecules (the ligands) and the protein target.\n", + "\n", + "The bioactivity is provided as pIC50 values against the aspartic protease renin for molecular series of indole-3-carboxamides and azaindoles. The structure−activity relationship (SAR) is relatively well understood for these molecules. For example, for the indole-3-carboxamides, a PDB structure of the protein-ligand complex is available [3oot](https://www.rcsb.org/structure/3OOT), illustrating the interactions of the potent ligand \"5k\" (IC50=2 nM), that can be used to evaluate XAI explanations. See the paper of [Harren et al.](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) for useful details and references." + ] + }, + { + "cell_type": "markdown", + "id": "6d726f57-40ff-44dd-a360-76953f42e0c8", + "metadata": {}, + "source": [ + "Let's read in the molecular data." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7c7588c6-1c9c-4831-96a8-98223ee7c75a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
pubchem_cidpubchem_inchipubchem_smilesnameoriginpIC50
054753101.0InChI=1S/C26H25FN4O2/c1-17-10-11-20(27)18(2)24...CC1=C(C(=C(C=C1)F)C)OC2=C(C3=C(N2C4=CC=CC=C4)C...b2a3mBMCL2011A8.8861
152949598.0InChI=1S/C26H24FN3O3/c1-17-7-8-18(27)15-23(17)...CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C=CC...b1i5cBMCL20108.6990
244195154.0InChI=1S/C28H28FN3O2/c1-18-20(7-6-10-22(18)29)...CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)C=CC(=...b1i5kBMCL20108.6990
344194118.0InChI=1S/C26H25FN4O2/c1-17-8-9-21(27)18(2)24(1...CC1=C(C(=C(C=C1)F)C)OC2=C(C3=C(N2C4=CC=CC=C4)C...b2a6dBMCL2011A8.6990
453346499.0InChI=1S/C32H29FN4O2/c1-22-12-13-24(33)21-28(2...CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C(=N...b2a7dBMCL2011A8.5229
.....................
13352947015.0InChI=1S/C26H25N3O2/c1-19-18-28(17-16-27-19)25...CC1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C...b1i3aBMCL20105.1898
13456675445.0InChI=1S/C26H25FN4O3/c1-17-8-9-18(27)16-21(17)...CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C=CC...b2a5aBMCL2011A5.1785
13552944458.0InChI=1S/C27H27N3O2/c1-27(2)19-29(18-17-28-27)...CC1(CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=...b1i3bBMCL20105.1367
13652948980.0InChI=1S/C26H22F3N3O2/c27-26(28,29)18-7-6-10-2...C1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C4...b1i2cBMCL20105.0458
13752944486.0InChI=1S/C26H25FN4O2/c1-17-18(5-4-7-21(17)27)1...CC1=C(C=CC=C1F)CC2=C(C3=CC=CC=C3N2C4=CNC(=O)C=...b1i4hBMCL20105.0292
\n", + "

138 rows × 6 columns

\n", + "
" + ], + "text/plain": [ + " pubchem_cid pubchem_inchi \\\n", + "0 54753101.0 InChI=1S/C26H25FN4O2/c1-17-10-11-20(27)18(2)24... \n", + "1 52949598.0 InChI=1S/C26H24FN3O3/c1-17-7-8-18(27)15-23(17)... \n", + "2 44195154.0 InChI=1S/C28H28FN3O2/c1-18-20(7-6-10-22(18)29)... \n", + "3 44194118.0 InChI=1S/C26H25FN4O2/c1-17-8-9-21(27)18(2)24(1... \n", + "4 53346499.0 InChI=1S/C32H29FN4O2/c1-22-12-13-24(33)21-28(2... \n", + ".. ... ... \n", + "133 52947015.0 InChI=1S/C26H25N3O2/c1-19-18-28(17-16-27-19)25... \n", + "134 56675445.0 InChI=1S/C26H25FN4O3/c1-17-8-9-18(27)16-21(17)... \n", + "135 52944458.0 InChI=1S/C27H27N3O2/c1-27(2)19-29(18-17-28-27)... \n", + "136 52948980.0 InChI=1S/C26H22F3N3O2/c27-26(28,29)18-7-6-10-2... \n", + "137 52944486.0 InChI=1S/C26H25FN4O2/c1-17-18(5-4-7-21(17)27)1... \n", + "\n", + " pubchem_smiles name origin \\\n", + "0 CC1=C(C(=C(C=C1)F)C)OC2=C(C3=C(N2C4=CC=CC=C4)C... b2a3m BMCL2011A \n", + "1 CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C=CC... b1i5c BMCL2010 \n", + "2 CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)C=CC(=... b1i5k BMCL2010 \n", + "3 CC1=C(C(=C(C=C1)F)C)OC2=C(C3=C(N2C4=CC=CC=C4)C... b2a6d BMCL2011A \n", + "4 CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C(=N... b2a7d BMCL2011A \n", + ".. ... ... ... \n", + "133 CC1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C... b1i3a BMCL2010 \n", + "134 CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)C=CC... b2a5a BMCL2011A \n", + "135 CC1(CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=... b1i3b BMCL2010 \n", + "136 C1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C4... b1i2c BMCL2010 \n", + "137 CC1=C(C=CC=C1F)CC2=C(C3=CC=CC=C3N2C4=CNC(=O)C=... b1i4h BMCL2010 \n", + "\n", + " pIC50 \n", + "0 8.8861 \n", + "1 8.6990 \n", + "2 8.6990 \n", + "3 8.6990 \n", + "4 8.5229 \n", + ".. ... \n", + "133 5.1898 \n", + "134 5.1785 \n", + "135 5.1367 \n", + "136 5.0458 \n", + "137 5.0292 \n", + "\n", + "[138 rows x 6 columns]" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "data_path = Path(\"example_data\") / \"renin_harren.csv\"\n", + "df = pd.read_csv(data_path)\n", + "df" + ] + }, + { + "cell_type": "markdown", + "id": "0f7353cb-3818-420e-bc5a-e7f43f351ab1", + "metadata": {}, + "source": [ + "A small note on the data sets: \n", + "We use a version of the Renin data set from PubChem because automatic downloading from the journal website is prevented. This version contains only 138 instead of the original 142 compounds, since the missing molecules were not available in PubChem. This difference should be negligible for the illustrative purpose of this notebook. " + ] + }, + { + "cell_type": "markdown", + "id": "7ae0441a-660c-4362-83b7-e9e88a17e72d", + "metadata": {}, + "source": [ + "We construct RDKit molecule data structures from the SMILES and add relevant infos as properties to the molecules for convenience.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "1786f5a1-bec6-475a-ba1a-9f798d4b753b", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/html": [ + "\n", + "\n", + "\n", + "
nameb1i5c
originBMCL2010
pIC508.699
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mols = [Chem.MolFromSmiles(smiles) for smiles in df[\"pubchem_smiles\"]]\n", + "for prop_name in [\"name\", \"origin\", \"pIC50\"]:\n", + " for mol, prop in zip(mols, df[prop_name]):\n", + " mol.SetProp(prop_name, str(prop))\n", + "mols[1]" + ] + }, + { + "cell_type": "markdown", + "id": "9470f1df-6175-4991-bb08-9c84692f3f58", + "metadata": {}, + "source": [ + "The target values are extraxted in a separate list `y`" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "d2c32c0a-baf5-4c71-acb7-859ab016628b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[8.8861, 8.699, 8.699]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "y = [float(mol.GetProp(\"pIC50\")) for mol in mols]\n", + "y[:3]" + ] + }, + { + "cell_type": "markdown", + "id": "04ddb98e-9eba-45fd-be5f-5fc96eed9d86", + "metadata": {}, + "source": [ + "## Explaining predictions with MolPipeline" + ] + }, + { + "cell_type": "markdown", + "id": "38c88eb2-2a3f-494b-8f05-2036c9f0a815", + "metadata": {}, + "source": [ + "We start by setting up and fitting a standard Random Forest model with Morgan fingerprints on the data set with a pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "fe99594a-0e06-49d9-81d4-59b735a8af5e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Pipeline(steps=[('auto2mol', AutoToMol()), ('morgan', MolToMorganFP(radius=3)),\n",
+       "                ('rf',\n",
+       "                 RandomForestRegressor(n_estimators=500, random_state=123456))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
" + ], + "text/plain": [ + "Pipeline(steps=[('auto2mol', AutoToMol()), ('morgan', MolToMorganFP(radius=3)),\n", + " ('rf',\n", + " RandomForestRegressor(n_estimators=500, random_state=123456))])" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline = Pipeline(\n", + " [\n", + " (\"auto2mol\", AutoToMol()),\n", + " (\"morgan\", MolToMorganFP(n_bits=2048, radius=3)),\n", + " (\"rf\", RandomForestRegressor(n_estimators=500, random_state=RANDOM_STATE)),\n", + " ]\n", + ")\n", + "\n", + "pipeline.fit(mols, y)" + ] + }, + { + "cell_type": "markdown", + "id": "715075c3-cff8-4870-bd9c-7fd87b7bb7c3", + "metadata": {}, + "source": [ + "After the model is trained, we can simply pass the `pipeline` into a MolPipeline explainer and call the `explain` function to generate explanations for a list of molecules. Here we use the `SHAPTreeExplainer` which is a wrapper around [SHAP's TreeExplainer](https://shap.readthedocs.io/en/latest/generated/shap.TreeExplainer.html) that handles all necessary steps to generate explanations for molecules automatically. In addition, all molecules given to `explain` will be processed by the `pipeline` meaning all transformation, standardization and error handling steps will also be applied to explaining molecules." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "18ccba9b-42d5-4078-bc2a-b433d93d79b2", + "metadata": {}, + "outputs": [], + "source": [ + "explainer = SHAPTreeExplainer(pipeline)\n", + "explanations = explainer.explain(mols)" + ] + }, + { + "cell_type": "markdown", + "id": "641c7f21-38d7-4858-ade3-f9fca6703b53", + "metadata": {}, + "source": [ + "We can check if an explanation could be computed successfully by calling the `is_valid()` function, e.g. sometimes errors can occur for unprocessable molecules. " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4c8a7790-39c6-462c-afd2-22a26c78afde", + "metadata": {}, + "outputs": [], + "source": [ + "assert all(exp for exp in explanations if not exp.is_valid())" + ] + }, + { + "cell_type": "markdown", + "id": "d7f48fd5-5cf9-4d3a-870a-4bedba58ef38", + "metadata": {}, + "source": [ + "The algorithm behind MolPipeline's `SHAPTreeExplainer` uses [SHAP's TreeExplainer](https://shap.readthedocs.io/en/latest/generated/shap.TreeExplainer.html) to estimate Shapley Values. With the Shapley Values each feature is attributed an importance value for the prediction, which we term `feature_weights`. When computed on the Morgan fingerprint, these `feature_weigths` can be mapped to `atom_weights`. The weight of an atom is the sum of all feature weights of substructures intersecting the atom, normalized by the substructure size and occurrence in the molecule. See [Feldmann et al.](https://doi.org/10.3390/biom12040557) for more details and [Harren et al.](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) for some alternative approaches for calculating atom weights." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "f9b515d8-b837-4277-b1cf-17ac6e890b8e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([0. , 0.00033001, 0.00010306, ..., 0. , 0.00025244,\n", + " 0. ])" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# feature weights for the first molecule\n", + "explanations[0].feature_weights" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "ae932eb9-d144-4ed3-8f98-ad20fc37dfb3", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([ 0.12741071, 0.13919019, 0.07593206, 0.13930674, 0.08594195,\n", + " 0.01826992, 0.07099029, 0.03481592, 0.13192108, 0.0763856 ,\n", + " 0.06904259, 0.06558726, 0.06548176, 0.0672983 , 0.0670064 ,\n", + " 0.05595265, 0.00196574, 0.00114999, 0.00072739, 0.00114999,\n", + " 0.00196574, 0.00710791, -0.00021892, 0.00095641, -0.00198274,\n", + " 0.06092596, 0.00555693, 0.00703136, 0.00723796, 0.00834429,\n", + " 0.00433498, 0.00834429, 0.00723796])" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# atom weights for the first molecule\n", + "explanations[0].atom_weights" + ] + }, + { + "cell_type": "markdown", + "id": "cb21eef9-20fb-4e8d-910a-d1389f9ce09f", + "metadata": {}, + "source": [ + "## Visualizing explanations with MolPipeline\n", + "\n", + "Now that we generated explanations, let's depict one to understand how they can be used for interpreting predictions. " + ] + }, + { + "cell_type": "markdown", + "id": "971d77d5-bf8c-4c7a-985c-a27dd9f5316a", + "metadata": {}, + "source": [ + "We select a molecule from the data set:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "9b91dfea-7d87-4a2c-9d5d-1d3e7810e21a", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/html": [ + "\n", + "\n", + "\n", + "
nameb3a4g
originBMCL2011B
pIC507.0506
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "explanation = explanations[55]\n", + "explanation.molecule" + ] + }, + { + "cell_type": "markdown", + "id": "4284b9d9-b1cf-4328-be1d-4f7a2c5c7fb6", + "metadata": {}, + "source": [ + "We can illustrate MolPipeline's explanations with the `structure_heatmap_shap`. This function generates an image with a 2D depiction of the molecular structure and explanations. " + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "c5545ea6-ca77-48c6-9871-f84f7fdbfe88", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "structure_heatmap_shap(\n", + " explanation\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "9a3baa39-95c1-4b39-a591-32ae7a145024", + "metadata": {}, + "source": [ + "The explanations are a heatmap and a decomposition of the predicted value into three contributing factors.\n", + "\n", + "**Heatmap of the molecular structure** \n", + "Substructures associated with positive contributions by the model are highlighted in red, while substructures with negative contributions would be illustrated in blue. No color indicates no contribution. Therefore, we can interprete that substructures highlighted in red are important for high activity while blue substructures are unfavorable for activity. \n", + "\n", + "**Contribution of present/absent substructure features** \n", + "In addition to the heatmap of the structure, MolPipelines also provides a breakdown of the predicted value on the bottom of the explanation image. The model predicts a pIC50 value of 6.92 for the compound. This value can be decomposed in contributions from the expected value of the model output (see [Lundberg et al.](https://doi.org/10.48550/arXiv.1705.07874) for details), the features present and features absent using the Shapley Value-based feature weights. Since we are using Morgan binary fingerprints, present features correspond to the substructures present in the molecule, that can be seen in the image. In contrast, absent features are features that are important for the prediction but are not in the depicted molecule. For example, some features/substructures in the training set can be important for the model and that they are missing in this particular molecule influences the prediction. \n", + "\n", + "Note that this decomposition is currently only provided for binary fingerprints." + ] + }, + { + "cell_type": "markdown", + "id": "eb9cfe22-bd1e-4d40-9ff5-095a8bf646dd", + "metadata": {}, + "source": [ + "## Comparison to explanations from Harren et al.\n", + "\n", + "We compare explanations [Harren et al.](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) obtained with a simple multilayer perceptron (MLP) with Morgan fingerprints using DeepSHAP to the standard Random Forest model with Morgan fingeprints using SHAP's TreeExplainer on the real-world lead optimization data set. \n", + "\n", + "In the lead optimization step in drug discovery, it is important that a machine learning model reflects the affinity trends induced by smaller structural changes. Especially, small structural changes causing large affinity changes are of interest and the interpretations obtained from an XAI method should adequately capture and visualize these trends. The goal of applying XAI in such a project is usually to identify further small structural modification to improve affinity. See [Harren et al.](https://pubs.acs.org/doi/10.1021/acs.jcim.1c01263) for details. " + ] + }, + { + "cell_type": "markdown", + "id": "9f9a402c-e184-478a-99b5-bddf165794e3", + "metadata": {}, + "source": [ + "For convenience we store all explanations in a dict to access them by the molecules IDs or names." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "f4332475-d5c1-416b-9394-f8d3d0c0b9da", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "138" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "explanations_dict = {exp.molecule.GetProp(\"name\"): exp for exp in explanations}\n", + "len(explanations_dict)" + ] + }, + { + "cell_type": "markdown", + "id": "ef02cd6b-0c85-441f-ab57-967b550581b3", + "metadata": {}, + "source": [ + "We will compare our explanation with the explanations in Figure 6 in Harren et al. Therefore, we extract the 3 compounds of different affinities (see Harren et al.):\n", + "\n", + "| Compound name | exp. IC50 | exp. pIC50| \n", + "| --- | --- | --- |\n", + "| 5k (b1i5k) | 0.002 | 8.70|\n", + "| 2n (b1i2n) | 0.009 | 8.05|\n", + "| 5b (b1i5b) | 1.350 | 5.87|" + ] + }, + { + "cell_type": "markdown", + "id": "2cc8a0fb-b4a4-47b6-ba99-4b4cea4f14ad", + "metadata": {}, + "source": [ + "Extract the min and max atom weight values for normalizing the coloring of the series" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "cf838915-2fb1-4507-9a6d-344366f88ca4", + "metadata": {}, + "outputs": [], + "source": [ + "# series_names = [\"b1i5k\", \"b1i2n\", \"b1i5b\"]\n", + "# series_dict = {k:explanations_dict[k] for k in series_names}\n", + "series_dict = explanations_dict\n", + "\n", + "weight_min = min([exp.atom_weights.min() for exp in series_dict.values()])\n", + "weight_max = max([exp.atom_weights.max() for exp in series_dict.values()])\n", + "weight_abs_max = max(abs(weight_min), abs(weight_max))\n", + "# following Harren et al., we set the maximum color intensity to 70% of the maximal numerical value\n", + "# for better visual differentiations in low value regions.\n", + "weight_abs_max = weight_abs_max * 0.70" + ] + }, + { + "cell_type": "markdown", + "id": "47e35531-1b91-4f39-a71b-357fadd105dc", + "metadata": {}, + "source": [ + "First, let's have a look at the most active compound **5k** in the series ([Scheiper et al. 2010](https://doi.org/10.1016/j.bmcl.2010.08.092))\n" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "c5aadaed-f55a-4fa8-a471-a64bf0e9e994", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/html": [ + "\n", + "\n", + "\n", + "
nameb1i5k
originBMCL2010
pIC508.699
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "explanations_dict[\"b1i5k\"].molecule" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "d9048b03-6ce6-475d-b440-4f403b45cce4", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "structure_heatmap_shap(explanations_dict[\"b1i5k\"], color_limits=(-weight_abs_max, weight_abs_max))" + ] + }, + { + "cell_type": "markdown", + "id": "20bc00de-d184-4cdf-be8e-2440ff183d06", + "metadata": {}, + "source": [ + "The heatmap explanation strongly highlights a positive contribution from the indole's hydroxyl group. As also described by Harren et al., this hydroxyl has a favorable hydrogen bond with His287 (see the crystal structure [3oot](https://www.rcsb.org/structure/3OOT)), one of the key interactions of this compound. There are also smaller contributions from the methyl at the indole system and the fluorine and methyl decorations on the benzyl substituent. \n", + "\n", + "Interestingly, our heatmap explanation differs from Harren et al. (see Figure 6). The coloring using DeepSHAP in Harren et al. indicates a postive contributions from larger parts of the structure, including the whole indole system. In comparison, our heatmap is much sparser. We hypothesis that this difference is due to the machine learning algorithms used. Random Forest's predictions are potentially based on a sparser set of features as the MLP's, because a highly predictive (or important) feature will correspond to only a short path in the decision tree, considering only few features. In contrast, an MLP might be considering a larger set of all fingerprint features. \n", + "\n", + "This difference in results illustrate that explanations are hard to compare because there is no real ground truth. While Harren et al. found reasonable interpretations for the larger highlighted parts in their results, we could hypothesize that the hydroxyl's hydrogen bond is the key contributer to affinity in this compound, justifying strong highlighting. Therefore, considering multiple models and approaches seems necessary to get a comprehensive interpretation. \n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "20bcb584-943b-4a02-8e11-6a23ead5a0b5", + "metadata": {}, + "source": [ + "Next, we look at the **2n** compound which differs structurally only in the indole decoration to **5k** but has a 4.5-fold lower binding affinity." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "00f75eca-e30c-49cf-aa9c-6011214a852d", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/html": [ + "\n", + "\n", + "\n", + "
nameb1i2n
originBMCL2010
pIC508.0458
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "\n", + "explanations_dict[\"b1i2n\"].molecule" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "5c8867a2-926a-4458-8194-6edce7c000d5", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "structure_heatmap_shap(explanations_dict[\"b1i2n\"], color_limits=(-weight_abs_max, weight_abs_max))" + ] + }, + { + "cell_type": "markdown", + "id": "081c4cd6-34fc-44c3-a7be-218ed30fd3fc", + "metadata": {}, + "source": [ + "This heatmap shows more but less strongy pronounced contributions than for **5k**. The benzyl's flourine and methyl decorations are the strongest contributers, while now also the whole indole system is contributing. The heatmap is in relatively well agreement with Harren et al.'s results." + ] + }, + { + "cell_type": "markdown", + "id": "ced35b64-86d0-47d1-98bf-6941443e6607", + "metadata": {}, + "source": [ + "Finally, the let's look at compound **5b**" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "d47913e5-ead0-47d1-910f-3f83662338da", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/html": [ + "\n", + "\n", + "\n", + "
nameb1i5b
originBMCL2010
pIC505.8697
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "explanations_dict[\"b1i5b\"].molecule" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "0b1e490d-2a80-4e1b-8041-d34ddabff5ed", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "structure_heatmap_shap(explanations_dict[\"b1i5b\"], color_limits=(-weight_abs_max, weight_abs_max))" + ] + }, + { + "cell_type": "markdown", + "id": "de3fdaee-d746-4783-a0e3-1d572b56f78a", + "metadata": {}, + "source": [ + "Also for this compound, the heatmap of our Random Forest model is in relative good aggreement with the MLP-based explanations in Harren et al.'s paper. Positive contributions come from the decorated benzyl but negative contributions from the indole and it's decoration. As explained by Harren et al., the methoxy on the indole is unfavorable because its too big and lacking hydrogen bond donor potential to fit into this location in the binding pocket. " + ] + }, + { + "cell_type": "markdown", + "id": "36c1a67c-2fbf-4401-a422-9d3cf366feaf", + "metadata": {}, + "source": [ + "## Conclusion\n", + "\n", + "Explanations for individual predictions can be easily generated with MolPipeline. The heatmap explanations on the atom level can be used to highlight important parts of the compound. In addition, SHAP-based explanations can be used to decompose a prediction in contributions from freatures present and absent in the molecule.\n", + "\n", + "The comparison on the real-world drug discovery data set from Harren et al. shows that the explanations generated with MolPipeline are reasonable and correspond to important protein-ligand interactions and are mostly in good aggreement with other methods. However, the explanation generated with MolPipeline and Random Forest can differ from these generated with an MLP and DeepSHAP. Therefore, it is essential to inteprete all explanations in the context of the data set and models used. Additionally, multiple models can be tried to get a more comprehensive interpretation. " + ] + }, + { + "cell_type": "markdown", + "id": "fd5d90f9-81d1-4d4a-893e-876a28dd8410", + "metadata": {}, + "source": [ + "## Further reading\n", + "Interpreting model predicitions with XAI methods can be challenging. Harren et al. nicely describe more of these challenges in their paper. For example, for interpreting SAR results it is important to know which part of the molecules were exchanged and which were kept static during the exploration and therefore in the data set. Substituents that are the same in all molecules of the series will likely have neutral influence on the model predictions. However, exchanging them might have a large negative or positive effect on affinity, which is likely not captured by the data, the model and the explanations. " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt index c6fab9f9..d53ffddc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ joblib >= 1.3.0 loguru +matplotlib numpy pandas rdkit >= 2023.9.1 scipy setuptools scikit-learn >= 1.4.0 +shap typing_extensions diff --git a/tests/test_explainability/__init__.py b/tests/test_explainability/__init__.py new file mode 100644 index 00000000..dad24e8b --- /dev/null +++ b/tests/test_explainability/__init__.py @@ -0,0 +1 @@ +"""Test explainability methods and utilities.""" diff --git a/tests/test_explainability/test_shap_explainers.py b/tests/test_explainability/test_shap_explainers.py new file mode 100644 index 00000000..1e7df3f6 --- /dev/null +++ b/tests/test_explainability/test_shap_explainers.py @@ -0,0 +1,410 @@ +"""Test SHAP's TreeExplainer wrapper.""" + +import unittest + +import numpy as np +import pandas as pd +from rdkit import Chem, rdBase +from sklearn.base import BaseEstimator, is_classifier, is_regressor +from sklearn.ensemble import ( + GradientBoostingClassifier, + GradientBoostingRegressor, + RandomForestClassifier, + RandomForestRegressor, +) +from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.svm import SVC, SVR + +from molpipeline import ErrorFilter, FilterReinserter, Pipeline, PostPredictionWrapper +from molpipeline.abstract_pipeline_elements.core import RDKitMol +from molpipeline.any2mol import SmilesToMol +from molpipeline.explainability.explainer import SHAPKernelExplainer, SHAPTreeExplainer +from molpipeline.explainability.explanation import ( + AtomExplanationMixin, + SHAPFeatureAndAtomExplanation, + SHAPFeatureExplanation, +) +from molpipeline.mol2any import ( + MolToConcatenatedVector, + MolToMorganFP, + MolToRDKitPhysChem, +) +from molpipeline.mol2mol import SaltRemover +from molpipeline.utils.subpipeline import SubpipelineExtractor +from tests.test_explainability.utils import construct_kernel_shap_kwargs + +TEST_SMILES = ["CC", "CCO", "COC", "c1ccccc1(N)", "CCC(-O)O", "CCCN"] +CONTAINS_OX = [0, 1, 1, 0, 1, 0] + +TEST_SMILES_WITH_BAD_SMILES = [ + "CC", + "CCO", + "COC", + "MY_FIRST_BAD_SMILES", + "c1ccccc1(N)", + "CCC(-O)O", + "CCCN", + "BAD_SMILES_2", +] +CONTAINS_OX_BAD_SMILES = [0, 1, 1, 0, 0, 1, 0, 1] + +_RANDOM_STATE = 67056 + + +class TestSHAPExplainers(unittest.TestCase): + """Test SHAP's Explainer wrappers.""" + + def _test_valid_explanation( + self, + explanation: SHAPFeatureExplanation | SHAPFeatureAndAtomExplanation, + estimator: BaseEstimator, + molecule_reader_subpipeline: Pipeline, + nof_features: int, + test_smiles: str, + explainer: SHAPKernelExplainer | SHAPTreeExplainer, + ) -> None: + """Helper method to test if the explanation is valid and has the correct shape and content. + + Parameters + ---------- + explanation : Explanation + The explanation to be tested. + estimator : BaseEstimator + The estimator used in the pipeline. + molecule_reader_subpipeline : Pipeline + The subpipeline that extracts the molecule from the input data. + nof_features : int + The number of features in the feature vector. + test_smiles : str + The SMILES string of the molecule. + explainer : SHAPKernelExplainer | SHAPTreeExplainer + The explainer used to generate the explanation. + """ + self.assertTrue(explanation.is_valid()) + + self.assertIsInstance(explanation.feature_vector, np.ndarray) + self.assertEqual( + (nof_features,), explanation.feature_vector.shape # type: ignore[union-attr] + ) + + # feature names should be a list of not empty strings + self.assertTrue( + all( + isinstance(name, str) and len(name) > 0 + for name in explanation.feature_names # type: ignore[union-attr] + ) + ) + self.assertEqual( + len(explanation.feature_names), explanation.feature_vector.shape[0] # type: ignore + ) + + self.assertIsInstance(explanation.molecule, RDKitMol) + self.assertEqual( + Chem.MolToInchi(*molecule_reader_subpipeline.transform([test_smiles])), + Chem.MolToInchi(explanation.molecule), + ) + + self.assertIsInstance(explanation.prediction, np.ndarray) + self.assertIsInstance(explanation.feature_weights, np.ndarray) + if is_regressor(estimator): + self.assertTrue((1,), explanation.prediction.shape) # type: ignore[union-attr] + self.assertEqual( + (nof_features,), explanation.feature_weights.shape # type: ignore[union-attr] + ) + elif is_classifier(estimator): + self.assertTrue((2,), explanation.prediction.shape) # type: ignore[union-attr] + if isinstance(explainer, SHAPTreeExplainer) and isinstance( + estimator, GradientBoostingClassifier + ): + # there is currently a bug in SHAP's TreeExplainer for GradientBoostingClassifier + # https://github.com/shap/shap/issues/3177 returning only one feature weight + # which is also based on log odds. This check is a workaround until the bug is fixed. + self.assertEqual( + (nof_features,), explanation.feature_weights.shape # type: ignore[union-attr] + ) + elif isinstance(estimator, SVC): + # SVC seems to be handled differently by SHAP. It returns only a one dimensional + # feature array for binary classification. + self.assertTrue( + (1,), explanation.prediction.shape # type: ignore[union-attr] + ) + self.assertEqual( + (nof_features,), explanation.feature_weights.shape # type: ignore[union-attr] + ) + else: + # normal binary classification case + self.assertEqual( + (nof_features, 2), explanation.feature_weights.shape # type: ignore[union-attr] + ) + else: + raise ValueError("Error in unittest. Unsupported estimator.") + + if issubclass(type(explainer), AtomExplanationMixin): + self.assertIsInstance(explanation.atom_weights, np.ndarray) # type: ignore[union-attr] + self.assertEqual( + explanation.atom_weights.shape, # type: ignore[union-attr] + (explanation.molecule.GetNumAtoms(),), # type: ignore[union-attr] + ) + + def test_explanations_fingerprint_pipeline( # pylint: disable=too-many-locals + self, + ) -> None: + """Test SHAP's TreeExplainer wrapper on MolPipeline's pipelines with fingerprints.""" + + tree_estimators = [ + RandomForestClassifier(n_estimators=2, random_state=_RANDOM_STATE), + RandomForestRegressor(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingClassifier(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingRegressor(n_estimators=2, random_state=_RANDOM_STATE), + ] + other_estimators = [ + SVC(kernel="rbf", probability=False, random_state=_RANDOM_STATE), + SVR(kernel="linear"), + LogisticRegression(random_state=_RANDOM_STATE), + LinearRegression(), + ] + n_bits = 64 + + explainer_types = [ + SHAPKernelExplainer, + SHAPTreeExplainer, + ] + explainer_estimators = [tree_estimators + other_estimators, tree_estimators] + + for estimators, explainer_type in zip(explainer_estimators, explainer_types): + + # test explanations with different estimators + for estimator in estimators: + pipeline = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ("morgan", MolToMorganFP(radius=1, n_bits=n_bits)), + ("model", estimator), + ] + ) + pipeline.fit(TEST_SMILES, CONTAINS_OX) + + # some explainers require additional kwargs + explainer_kwargs = {} + if explainer_type == SHAPKernelExplainer: + explainer_kwargs = construct_kernel_shap_kwargs( + pipeline, TEST_SMILES + ) + + explainer = explainer_type(pipeline, **explainer_kwargs) + explanations = explainer.explain(TEST_SMILES) + self.assertEqual(len(explanations), len(TEST_SMILES)) + + self.assertTrue( + issubclass(explainer.return_element_type_, AtomExplanationMixin) + ) + + # get the subpipeline that extracts the molecule from the input data + mol_reader_subpipeline = SubpipelineExtractor( + pipeline + ).get_molecule_reader_subpipeline() + self.assertIsInstance(mol_reader_subpipeline, Pipeline) + + for i, explanation in enumerate(explanations): + self._test_valid_explanation( + explanation, + estimator, + mol_reader_subpipeline, # type: ignore[arg-type] + n_bits, + TEST_SMILES[i], + explainer=explainer, # type: ignore[arg-type] + ) + + # pylint: disable=too-many-locals + def test_explanations_pipeline_with_invalid_inputs(self) -> None: + """Test SHAP's TreeExplainer wrapper with invalid inputs.""" + + # estimators to test + estimators = [ + RandomForestClassifier(n_estimators=2, random_state=_RANDOM_STATE), + RandomForestRegressor(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingClassifier(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingRegressor(n_estimators=2, random_state=_RANDOM_STATE), + ] + + # fill values considered invalid predictions + invalid_fill_values = [None, np.nan, pd.NA] + # fill values considered valid predictions (although outside the valid range) + valid_fill_values = [0, 999] + # fill values to test + fill_values = invalid_fill_values + valid_fill_values + + n_bits = 64 + + for estimator in estimators: + for fill_value in fill_values: + + # pipeline with ErrorFilter + error_filter1 = ErrorFilter() + pipeline1 = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ("salt_remover", SaltRemover()), + ("error_filter", error_filter1), + ("morgan", MolToMorganFP(radius=1, n_bits=64)), + ("model", estimator), + ] + ) + + # pipeline with ErrorFilter and FilterReinserter + error_filter2 = ErrorFilter() + error_reinserter2 = PostPredictionWrapper( + FilterReinserter.from_error_filter(error_filter2, fill_value) + ) + pipeline2 = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ("salt_remover", SaltRemover()), + ("error_filter", error_filter2), + ("morgan", MolToMorganFP(radius=1, n_bits=n_bits)), + ("model", estimator), + ("error_reinserter", error_reinserter2), + ] + ) + + for pipeline in [pipeline1, pipeline2]: + + pipeline.fit(TEST_SMILES_WITH_BAD_SMILES, CONTAINS_OX_BAD_SMILES) + + explainer = SHAPTreeExplainer(pipeline) + log_block = rdBase.BlockLogs() # pylint: disable=unused-variable + explanations = explainer.explain(TEST_SMILES_WITH_BAD_SMILES) + del log_block + self.assertEqual( + len(explanations), len(TEST_SMILES_WITH_BAD_SMILES) + ) + + # get the subpipeline that extracts the molecule from the input data + mol_reader_subpipeline = SubpipelineExtractor( + pipeline + ).get_molecule_reader_subpipeline() + self.assertIsNotNone(mol_reader_subpipeline) + + for i, explanation in enumerate(explanations): + if i in [3, 7]: + self.assertFalse(explanation.is_valid()) + continue + + self._test_valid_explanation( + explanation, + estimator, + mol_reader_subpipeline, # type: ignore[arg-type] + n_bits, + TEST_SMILES_WITH_BAD_SMILES[i], + explainer=explainer, + ) + + def test_explanations_pipeline_with_physchem(self) -> None: + """Test SHAP's TreeExplainer wrapper on physchem feature vector.""" + + estimators = [ + RandomForestClassifier(n_estimators=2, random_state=_RANDOM_STATE), + RandomForestRegressor(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingClassifier(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingRegressor(n_estimators=2, random_state=_RANDOM_STATE), + ] + + # test explanations with different estimators + for estimator in estimators: + pipeline = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ("physchem", MolToRDKitPhysChem()), + ("model", estimator), + ] + ) + + pipeline.fit(TEST_SMILES, CONTAINS_OX) + + explainer = SHAPTreeExplainer(pipeline) + explanations = explainer.explain(TEST_SMILES) + self.assertEqual(len(explanations), len(TEST_SMILES)) + + # get the subpipeline that extracts the molecule from the input data + mol_reader_subpipeline = SubpipelineExtractor( + pipeline + ).get_molecule_reader_subpipeline() + self.assertIsNotNone(mol_reader_subpipeline) + + for i, explanation in enumerate(explanations): + self._test_valid_explanation( + explanation, + estimator, + mol_reader_subpipeline, # type: ignore[arg-type] + pipeline.named_steps["physchem"].n_features, + TEST_SMILES[i], + explainer=explainer, + ) + + self.assertEqual( + explanation.feature_names, + pipeline.named_steps["physchem"].feature_names, + ) + + def test_explanations_pipeline_with_concatenated_features(self) -> None: + """Test SHAP's TreeExplainer wrapper on concatenated feature vector.""" + + estimators = [ + RandomForestClassifier(n_estimators=2, random_state=_RANDOM_STATE), + RandomForestRegressor(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingClassifier(n_estimators=2, random_state=_RANDOM_STATE), + GradientBoostingRegressor(n_estimators=2, random_state=_RANDOM_STATE), + ] + + n_bits = 64 + + # test explanations with different estimators + for estimator in estimators: + pipeline = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ( + "features", + MolToConcatenatedVector( + [ + ( + "RDKitPhysChem", + MolToRDKitPhysChem(), + ), + ( + "MorganFP", + MolToMorganFP(radius=1, n_bits=n_bits), + ), + ] + ), + ), + ("model", estimator), + ] + ) + + pipeline.fit(TEST_SMILES, CONTAINS_OX) + + explainer = SHAPTreeExplainer(pipeline) + explanations = explainer.explain(TEST_SMILES) + self.assertEqual(len(explanations), len(TEST_SMILES)) + + # get the subpipeline that extracts the molecule from the input data + mol_reader_subpipeline = SubpipelineExtractor( + pipeline + ).get_molecule_reader_subpipeline() + self.assertIsNotNone(mol_reader_subpipeline) + + for i, explanation in enumerate(explanations): + self._test_valid_explanation( + explanation, + estimator, + mol_reader_subpipeline, # type: ignore[arg-type] + pipeline.named_steps["features"].n_features, + TEST_SMILES[i], + explainer=explainer, + ) + + self.assertEqual( + explanation.feature_names, + pipeline.named_steps["features"].feature_names, + ) diff --git a/tests/test_explainability/test_visualization/__init__.py b/tests/test_explainability/test_visualization/__init__.py new file mode 100644 index 00000000..5dd7b293 --- /dev/null +++ b/tests/test_explainability/test_visualization/__init__.py @@ -0,0 +1 @@ +"""Test explainability visualization.""" diff --git a/tests/test_explainability/test_visualization/test_gaussian_grid.py b/tests/test_explainability/test_visualization/test_gaussian_grid.py new file mode 100644 index 00000000..a71da924 --- /dev/null +++ b/tests/test_explainability/test_visualization/test_gaussian_grid.py @@ -0,0 +1,65 @@ +"""Test gaussian grid visualization.""" + +import unittest +from typing import ClassVar + +import numpy as np +from rdkit import Chem +from rdkit.Chem import Draw + +from molpipeline import Pipeline +from molpipeline.explainability import ( + SHAPFeatureAndAtomExplanation, + SHAPFeatureExplanation, + SHAPTreeExplainer, +) +from molpipeline.explainability.visualization.visualization import ( + make_sum_of_gaussians_grid, +) +from tests.test_explainability.test_visualization.test_visualization import ( + _get_test_morgan_rf_pipeline, +) + +TEST_SMILES = ["CC", "CCO", "COC", "c1ccccc1(N)", "CCC(-O)O", "CCCN"] +CONTAINS_OX = [0, 1, 1, 0, 1, 0] + + +class TestSumOfGaussiansGrid(unittest.TestCase): + """Test sum of gaussian grid .""" + + # pylint: disable=duplicate-code + test_pipeline: ClassVar[Pipeline] + test_explainer: ClassVar[SHAPTreeExplainer] + test_explanations: ClassVar[ + list[SHAPFeatureAndAtomExplanation | SHAPFeatureExplanation] + ] + + @classmethod + def setUpClass(cls) -> None: + """Set up the tests.""" + cls.test_pipeline = _get_test_morgan_rf_pipeline() + cls.test_pipeline.fit(TEST_SMILES, CONTAINS_OX) + cls.test_explainer = SHAPTreeExplainer(cls.test_pipeline) + cls.test_explanations = cls.test_explainer.explain(TEST_SMILES) + + def test_grid_with_shap_atom_weights(self) -> None: + """Test grid with SHAP atom weights.""" + for explanation in self.test_explanations: + self.assertTrue(explanation.is_valid()) + self.assertIsInstance(explanation.atom_weights, np.ndarray) # type: ignore[union-attr] + + mol_copy = Chem.Mol(explanation.molecule) + mol_copy = Draw.PrepareMolForDrawing(mol_copy) + value_grid = make_sum_of_gaussians_grid( + mol_copy, + atom_weights=explanation.atom_weights, # type: ignore[union-attr] + atom_width=np.inf, + grid_resolution=[8, 8], + padding=[0.4, 0.4], + ) + self.assertIsNotNone(value_grid) + self.assertEqual(value_grid.values.size, 8 * 8) + + # test that the range of summed gaussian values is as expected for SHAP + self.assertTrue(value_grid.values.min() >= -1) + self.assertTrue(value_grid.values.max() <= 1) diff --git a/tests/test_explainability/test_visualization/test_visualization.py b/tests/test_explainability/test_visualization/test_visualization.py new file mode 100644 index 00000000..4f549b3a --- /dev/null +++ b/tests/test_explainability/test_visualization/test_visualization.py @@ -0,0 +1,213 @@ +"""Test visualization methods for explanations.""" + +import unittest +from typing import ClassVar + +import numpy as np +from rdkit import Chem +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor + +from molpipeline import Pipeline +from molpipeline.any2mol import SmilesToMol +from molpipeline.explainability import ( + SHAPFeatureAndAtomExplanation, + SHAPFeatureExplanation, + SHAPTreeExplainer, + structure_heatmap, + structure_heatmap_shap, +) +from molpipeline.explainability.explainer import SHAPKernelExplainer +from molpipeline.mol2any import MolToMorganFP +from tests.test_explainability.utils import construct_kernel_shap_kwargs + +TEST_SMILES = ["CC", "CCO", "COC", "c1ccccc1(N)", "CCC(-O)O", "CCCN"] +CONTAINS_OX = [0, 1, 1, 0, 1, 0] # classification labels +REGRESSION_LABELS = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6] # regression labels + + +_RANDOM_STATE = 67056 + + +def _get_test_morgan_rf_pipeline(task: str = "classification") -> Pipeline: + """Get a test pipeline with Morgan fingerprints and a random forest classifier. + + Parameters + ---------- + task : str, optional (default="classification") + Task of the pipeline. Either "classification" or "regression". + + Returns + ------- + Pipeline + Pipeline with Morgan fingerprints and a random forest classifier. + """ + + if task == "classification": + model = RandomForestClassifier(n_estimators=2, random_state=_RANDOM_STATE) + elif task == "regression": + model = RandomForestRegressor(n_estimators=2, random_state=_RANDOM_STATE) + else: + raise ValueError(f"Invalid task: {task}") + + pipeline = Pipeline( + [ + ("smi2mol", SmilesToMol()), + ("morgan", MolToMorganFP(radius=1, n_bits=1024)), + ( + "model", + model, + ), + ] + ) + return pipeline + + +class TestExplainabilityVisualization(unittest.TestCase): + """Test the public interface of the visualization methods for explanations.""" + + test_pipeline_clf: ClassVar[Pipeline] + test_tree_explainer_clf: ClassVar[SHAPTreeExplainer] + test_tree_explanations_clf: ClassVar[ + list[SHAPFeatureAndAtomExplanation | SHAPFeatureExplanation] + ] + test_kernel_explainer_clf: ClassVar[SHAPKernelExplainer] + test_kernel_explanations_clf: ClassVar[ + list[SHAPFeatureAndAtomExplanation | SHAPFeatureExplanation] + ] + + test_pipeline_reg: ClassVar[Pipeline] + test_tree_explainer_reg: ClassVar[SHAPTreeExplainer] + test_tree_explanations_reg: ClassVar[ + list[SHAPFeatureAndAtomExplanation | SHAPFeatureExplanation] + ] + test_kernel_explainer_reg: ClassVar[SHAPKernelExplainer] + test_kernel_explanations_reg: ClassVar[ + list[SHAPFeatureAndAtomExplanation | SHAPFeatureExplanation] + ] + + @classmethod + def setUpClass(cls) -> None: + """Set up the tests.""" + # test pipeline for classification + cls.test_pipeline_clf = _get_test_morgan_rf_pipeline(task="classification") + cls.test_pipeline_clf.fit(TEST_SMILES, CONTAINS_OX) + + # test pipeline for regression + cls.test_pipeline_reg = _get_test_morgan_rf_pipeline(task="regression") + cls.test_pipeline_reg.fit(TEST_SMILES, REGRESSION_LABELS) + + # tree explainer for classification + cls.test_tree_explainer_clf = SHAPTreeExplainer(cls.test_pipeline_clf) + cls.test_tree_explanations_clf = cls.test_tree_explainer_clf.explain( + TEST_SMILES, + ) + + # tree explainer for regression + cls.test_tree_explainer_reg = SHAPTreeExplainer(cls.test_pipeline_reg) + cls.test_tree_explanations_reg = cls.test_tree_explainer_reg.explain( + TEST_SMILES + ) + + # kernel explainer for classification + kernel_kwargs_clf = construct_kernel_shap_kwargs( + cls.test_pipeline_clf, TEST_SMILES + ) + cls.test_kernel_explainer_clf = SHAPKernelExplainer( + cls.test_pipeline_clf, **kernel_kwargs_clf + ) + cls.test_kernel_explanations_clf = cls.test_kernel_explainer_clf.explain( + TEST_SMILES + ) + + # kernel explainer for regression + kernel_kwargs_reg = construct_kernel_shap_kwargs( + cls.test_pipeline_reg, TEST_SMILES + ) + cls.test_kernel_explainer_reg = SHAPKernelExplainer( + cls.test_pipeline_reg, **kernel_kwargs_reg + ) + cls.test_kernel_explanations_reg = cls.test_kernel_explainer_reg.explain( + TEST_SMILES + ) + + def test_structure_heatmap_fingerprint_based_atom_coloring(self) -> None: + """Test structure heatmap fingerprint-based atom coloring.""" + for explanation_list in [ + self.test_tree_explanations_clf, + self.test_kernel_explanations_clf, + self.test_tree_explanations_reg, + self.test_kernel_explanations_reg, + ]: + for explanation in explanation_list: + self.assertTrue(explanation.is_valid()) + self.assertIsInstance(explanation.atom_weights, np.ndarray) # type: ignore[union-attr] + image = structure_heatmap( + explanation.molecule, + explanation.atom_weights, # type: ignore + width=8, + height=8, + ) # type: ignore[union-attr] + self.assertIsNotNone(image) + self.assertEqual(image.format, "PNG") + + def test_structure_heatmap_shap_explanation(self) -> None: + """Test structure heatmap SHAP explanation.""" + for explanation_list in [ + self.test_tree_explanations_clf, + self.test_kernel_explanations_clf, + self.test_tree_explanations_reg, + self.test_kernel_explanations_reg, + ]: + for explanation in explanation_list: + self.assertTrue(explanation.is_valid()) + self.assertIsInstance(explanation, SHAPFeatureAndAtomExplanation) + self.assertIsInstance(explanation.atom_weights, np.ndarray) # type: ignore[union-attr] + image = structure_heatmap_shap( + explanation=explanation, # type: ignore[arg-type] + width=8, + height=8, + ) # type: ignore[union-attr] + self.assertIsNotNone(image) + self.assertEqual(image.format, "PNG") + + def test_explicit_hydrogens(self) -> None: + """Test that the visualization methods work with explicit hydrogens.""" + mol_implicit_hydrogens = Chem.MolFromSmiles("C") + explanations1 = self.test_tree_explainer_clf.explain( + [Chem.MolToSmiles(mol_implicit_hydrogens)] + ) + mol_added_hydrogens = Chem.AddHs(mol_implicit_hydrogens) + explanations2 = self.test_tree_explainer_clf.explain( + [Chem.MolToSmiles(mol_added_hydrogens)] + ) + mol_explicit_hydrogens = Chem.MolFromSmiles("[H]C([H])([H])[H]") + explanations3 = self.test_tree_explainer_clf.explain( + [Chem.MolToSmiles(mol_explicit_hydrogens)] + ) + + # test explanations' atom weights + self.assertEqual(len(explanations1), 1) + self.assertEqual(len(explanations2), 1) + self.assertEqual(len(explanations3), 1) + self.assertTrue(hasattr(explanations1[0], "atom_weights")) + self.assertTrue(hasattr(explanations2[0], "atom_weights")) + self.assertTrue(hasattr(explanations3[0], "atom_weights")) + self.assertIsInstance(explanations1[0].atom_weights, np.ndarray) # type: ignore[union-attr] + self.assertIsInstance(explanations2[0].atom_weights, np.ndarray) # type: ignore[union-attr] + self.assertIsInstance(explanations3[0].atom_weights, np.ndarray) # type: ignore[union-attr] + self.assertEqual(len(explanations1[0].atom_weights), 1) # type: ignore + self.assertEqual(len(explanations2[0].atom_weights), 1) # type: ignore + self.assertEqual(len(explanations3[0].atom_weights), 1) # type: ignore + + # test visualization + all_explanations = explanations1 + explanations2 + explanations3 + for explanation in all_explanations: + self.assertTrue(explanation.is_valid()) + image = structure_heatmap( + explanation.molecule, + explanation.atom_weights, # type: ignore + width=8, + height=8, + ) # type: ignore[union-attr] + self.assertIsNotNone(image) + self.assertEqual(image.format, "PNG") diff --git a/tests/test_explainability/utils.py b/tests/test_explainability/utils.py new file mode 100644 index 00000000..4006d9b3 --- /dev/null +++ b/tests/test_explainability/utils.py @@ -0,0 +1,36 @@ +"""Utils for explainability tests.""" + +from typing import Any + +import scipy + +from molpipeline import Pipeline +from molpipeline.utils.subpipeline import get_featurization_subpipeline + + +def construct_kernel_shap_kwargs(pipeline: Pipeline, data: list[str]) -> dict[str, Any]: + """Construct the kwargs for SHAPKernelExplainer. + + Convert sparse matrix to dense array because SHAPKernelExplainer does not + support sparse matrix as `data` and then explain dense matrices. + We stick to dense matrices for simplicity. + + Parameters + ---------- + pipeline : Pipeline + The pipeline used for featurization. + data : list[str] + The input data, e.g. SMILES strings. + + Returns + ------- + dict[str, Any] + The kwargs for SHAPKernelExplainer + """ + featurization_subpipeline = get_featurization_subpipeline( + pipeline, raise_not_found=True + ) + data_transformed = featurization_subpipeline.transform(data) # type: ignore[union-attr] + if scipy.sparse.issparse(data_transformed): + data_transformed = data_transformed.toarray() + return {"data": data_transformed}