diff --git a/snekmer/__init__.py b/snekmer/__init__.py index c4fa507..6b5acce 100644 --- a/snekmer/__init__.py +++ b/snekmer/__init__.py @@ -7,7 +7,8 @@ from . import cluster from . import vectorize from . import report +from . import _version # from . import walk -__version__ = "1.0.3" +__version__ = _version.__version__ diff --git a/snekmer/_version.py b/snekmer/_version.py new file mode 100644 index 0000000..92192ee --- /dev/null +++ b/snekmer/_version.py @@ -0,0 +1 @@ +__version__ = "1.0.4" diff --git a/snekmer/model.py b/snekmer/model.py index 4643686..2e307af 100644 --- a/snekmer/model.py +++ b/snekmer/model.py @@ -6,14 +6,16 @@ # imports import pandas as pd from typing import Any, Dict, List, Optional +from ._version import __version__ from .vectorize import KmerBasis from numpy.typing import NDArray -from sklearn.base import ClassifierMixin +from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier from sklearn.linear_model import LogisticRegression # LogisticRegressionCV from sklearn.model_selection import GridSearchCV, cross_validate from sklearn.pipeline import make_pipeline, Pipeline +from sklearn.svm import SVC # define default gridsearch param dict MODEL_PARAMS = { @@ -25,15 +27,16 @@ } NAME2MODEL = { - "decisiontree": DecisionTreeClassifier(), - "randomforest": RandomForestClassifier(), - "adaboost": AdaBoostClassifier(), - "logistic": LogisticRegression(), + "decisiontree": DecisionTreeClassifier, + "randomforest": RandomForestClassifier, + "adaboost": AdaBoostClassifier, + "logistic": LogisticRegression, + "svc": SVC, } # classification models for protein families -class SnekmerModel(ClassifierMixin): +class SnekmerModel(ClassifierMixin, BaseEstimator): """Classify a protein family using kmer vectors as input. Attributes @@ -62,7 +65,8 @@ class SnekmerModel(ClassifierMixin): def __init__( self, scaler: Optional[Any] = None, - model: str = "decisiontree", + model: str = "logistic", + model_params: Optional[Dict[Any, Any]] = {}, params: Dict[str, List] = MODEL_PARAMS, step_name="clf", ): @@ -80,15 +84,19 @@ def __init__( Optional custom name for classifier pipeline step. """ + # always compute svc probability + if model == "svc": + model_params["probability"] = True + self.scaler = scaler self.params = params - self.model = NAME2MODEL[model] + self.model = NAME2MODEL[model](**model_params) self.step_name = step_name + self.snekmer_version = __version__ - self.pipeline = None - self.search = None - - def fit(self, X: NDArray, y: NDArray, verbose: bool = True): + def fit( + self, X: NDArray, y: NDArray, gridsearch: bool = False, verbose: bool = True + ): """Train model using gridsearch-tuned hyperparameters. Parameters @@ -99,8 +107,11 @@ def fit(self, X: NDArray, y: NDArray, verbose: bool = True): Target values (class labels) as integers or strings. scores : array-like of shape (n_features,) or (n_features, n_outputs) NOT IMPLEMENTED YET-- for KmerScaler integration. + gridsearch: bool (default: False) + If True, uses grid search to determine hyperparameters. verbose : bool (default: True) If True, prints best gridsearch CV results to console. + Only active if `gridsearch=True`. Returns ------- @@ -109,25 +120,26 @@ def fit(self, X: NDArray, y: NDArray, verbose: bool = True): """ # define pipeline and gridsearch - self.pipeline = Pipeline( - steps=[("scaler", self.scaler), (self.step_name, self.model)] - ) - self.search = GridSearchCV(self.pipeline, self.params) - - # use gridsearch on training data to find best parameter set - self.search.fit(X, y) - if verbose: - print( - "best mean cross-validation score: {:.3f}".format( - self.search.best_score_ - ) + if gridsearch: + self.pipeline = Pipeline( + steps=[("scaler", self.scaler), (self.step_name, self.model)] ) - print("best parameters:", self.search.best_params_) + self.search = GridSearchCV(self.pipeline, self.params) + + # use gridsearch on training data to find best parameter set + self.search.fit(X, y) + if verbose: + print( + "best mean cross-validation score: {:.3f}".format( + self.search.best_score_ + ) + ) + print("best parameters:", self.search.best_params_) # fit model with best gridsearch parameters # self.scaler.fit(scores) self.model.fit(X, y) - return + return self def score(self, X: NDArray, y: NDArray): """Score model on test set. diff --git a/snekmer/plot.py b/snekmer/plot.py index e3c2076..a08ed77 100644 --- a/snekmer/plot.py +++ b/snekmer/plot.py @@ -4,29 +4,32 @@ """ # imports +from typing import Optional + import numpy as np import matplotlib.pyplot as plt import seaborn as sns +from numpy.typing import ArrayLike from sklearn.metrics import ( - accuracy_score, + PrecisionRecallDisplay, + RocCurveDisplay, auc, average_precision_score, - roc_curve, precision_recall_curve, ) -from sklearn.model_selection import ( - KFold, - train_test_split, - RandomizedSearchCV, - StratifiedKFold, -) -from sklearn.svm import SVC -from sklearn.preprocessing import LabelEncoder, StandardScaler +from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA from sklearn.manifold import TSNE -def cv_roc_curve(clf, X, y, title="ROC Curve", ax=None, dpi=400): +def cv_roc_curve( + clf, + X: ArrayLike, + y: ArrayLike, + title: str = "ROC Curve", + ax: Optional[plt.Axes] = None, + dpi: int = 400, +): """Plot cross-validated receiver operator characteristics curve. Adapted from example in sklearn documentation [1]. @@ -34,15 +37,15 @@ def cv_roc_curve(clf, X, y, title="ROC Curve", ax=None, dpi=400): Parameters ---------- - clf : dict of Classifier objects + clf : object Classifier object (e.g. DecisionTreeClassifier()). - X : dict of arrays - Feature arrays. - y : dict of arrays - Response arrays. + X : numpy.typing.ArrayLike + Feature array. + y : numpy.typing.ArrayLike + Response array. title : str Plot title. - ax : matplotlib.axes.Axes object or None (default: None) + ax : matplotlib.axes.Axes or None (default: None) Axis object, if already created. dpi : int (default: 400) Figure resolution. @@ -64,21 +67,23 @@ def cv_roc_curve(clf, X, y, title="ROC Curve", ax=None, dpi=400): # take each cv result for i in X.keys(): - probabilities = clf.fit(X[i]["train"], y[i]["train"]).predict_proba( - X[i]["test"] + clf.fit(X[i]["train"], y[i]["train"]) + viz = RocCurveDisplay.from_estimator( + clf.model, + X[i]["test"], + y[i]["test"], + name=f"ROC fold {i}", + alpha=0.3, + lw=1, + ax=ax, ) + interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr) + interp_tpr[0] = 0.0 + tprs.append(interp_tpr) + auc_rocs.append(viz.roc_auc) - # calculate roc curve/area, and interpolate values - fpr, tpr, _ = roc_curve(y[i]["test"], probabilities[:, 1]) - tprs.append(np.interp(mean_fpr, fpr, tpr)) - - tprs[-1][0] = 0.0 - roc_auc = auc(fpr, tpr) - auc_rocs.append(roc_auc) - ax.plot(fpr, tpr, lw=1, alpha=0.3, label=f"ROC fold {i} (AUC = {roc_auc:0.2f})") - - # plot y = x (50% chance) reference line - ax.plot([0, 1], [0, 1], linestyle="--", lw=2, color="r", label="Chance", alpha=0.8) + # plot random chance reference line + ax.plot([0, 1], [0, 1], "k--", label="Chance level (AUC = 0.5)") # calculate mean and stdev roc_auc and show in plot mean_tpr = np.mean(tprs, axis=0) @@ -118,20 +123,27 @@ def cv_roc_curve(clf, X, y, title="ROC Curve", ax=None, dpi=400): return fig, ax, auc_rocs -def cv_pr_curve(clf, X, y, title="PR Curve", ax=None, dpi=400): +def cv_pr_curve( + clf, + X: ArrayLike, + y: ArrayLike, + title: str = "PR Curve", + ax: Optional[plt.Axes] = None, + dpi: int = 400, +): """Plot cross-validated precision-recall curve. Parameters ---------- clf : Classifer object Classifier object (e.g. DecisionTreeClassifier()). - X : dict - Feature dataframe. - y : dict - Response dataframe. + X : numpy.typing.ArrayLike + Feature array. + y : numpy.typing.ArrayLike + Response array. title : str Plot title. - ax : matplotlib.axes.Axes object or None (default: None) + ax : matplotlib.axes.Axes or None (default: None) Axis object, if already created. dpi : int (default: 400) Figure resolution. @@ -141,7 +153,6 @@ def cv_pr_curve(clf, X, y, title="PR Curve", ax=None, dpi=400): fig, ax, pr_aucs """ - # initialize figure and define all axes fig = plt.figure( dpi=dpi, figsize=(8, 4), facecolor="white", constrained_layout=True @@ -152,38 +163,37 @@ def cv_pr_curve(clf, X, y, title="PR Curve", ax=None, dpi=400): y_real, y_proba, pr_aucs = [], [], [] for i in X.keys(): - probabilities = clf.fit(X[i]["train"], y[i]["train"]).predict_proba( - X[i]["test"] + clf.fit(X[i]["train"], y[i]["train"]) + probabilities = clf.model.predict_proba(X[i]["test"]) + precision, recall, _ = precision_recall_curve( + y[i]["test"], clf.predict(X[i]["test"]) ) - # Compute ROC curve and area the curve - precision, recall, _ = precision_recall_curve(y[i]["test"], probabilities[:, 1]) - avg_precision = average_precision_score(y[i]["test"], probabilities[:, 1]) - - # Plotting each individual PR Curve - ax.plot( - recall, - precision, - lw=1, + viz = PrecisionRecallDisplay.from_estimator( + clf.model, + X[i]["test"], + y[i]["test"], + name=f"ROC fold {i}", alpha=0.3, - label=f"PR fold {i} (AUC = {avg_precision:0.2f})", + lw=1, + ax=ax, ) y_real.append(y[i]["test"]) y_proba.append(probabilities[:, 1]) - pr_aucs.append(avg_precision) + pr_aucs.append(viz.average_precision) y_real = np.concatenate(y_real) y_proba = np.concatenate(y_proba) precision, recall, _ = precision_recall_curve(y_real, y_proba) - avg_total_precision = average_precision_score(y_real, y_proba) + # avg_total_precision = average_precision_score(y_real, y_proba) # plot average p-r curve ax.plot( recall, precision, color="b", - label=f"Precision-Recall (AUC = {avg_total_precision:0.2f})", + label=f"Precision-Recall (AUC = {viz.average_precision:0.2f})", lw=2, alpha=0.8, ) diff --git a/snekmer/score.py b/snekmer/score.py index 5ec78c6..285b74b 100644 --- a/snekmer/score.py +++ b/snekmer/score.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd +from ._version import __version__ from .vectorize import KmerBasis from .utils import to_feature_matrix from sklearn.metrics.pairwise import pairwise_distances @@ -536,6 +537,7 @@ def __init__(self): self.scores = {"sample": {}, "background": {}} self.score_norm = None self.probabilities = {"sample": {}, "background": {}} + self.snekmer_version = __version__ # load list of kmers and both seq and bg feature matrices def fit( @@ -584,28 +586,26 @@ def fit( "background": list(data.index[data[bg_col]]), } - # step 0: get feature matrix and all labels labels = data[label_col].values - #print(data[vec_col].shape) - #print(data[vec_col]) + # print(data[vec_col].shape) + # print(data[vec_col]) x = len(data[vec_col]) y = len(data[vec_col][0]) - #print(x,y) + # print(x,y) - matrix = np.zeros(x*y).reshape((x,y)) - #print(matrix.shape) + matrix = np.zeros(x * y).reshape((x, y)) + # print(matrix.shape) for i in range(x): for j in range(y): value = data[vec_col][i] value = value[j] - matrix[i,j] = value - #matrix = np.asarray(np.concatenate(data[vec_col])).reshape((len(data[vec_col]), len(data[vec_col][0]))) - #print(matrix.shape) - + matrix[i, j] = value + # matrix = np.asarray(np.concatenate(data[vec_col])).reshape((len(data[vec_col]), len(data[vec_col][0]))) + # print(matrix.shape) # step 0: get indices for label (family) ids # i_label = { @@ -716,15 +716,15 @@ def predict(self, array, kmers): """ # fit new input data to the correct kmer basis - #self.kmers.set_basis(kmers) + # self.kmers.set_basis(kmers) array = self.kmers.transform(array, kmers) - #return ( + # return ( # apply_feature_probabilities( # array, self.probabilities["sample"], scaler=self.scaler # ) # / self.score_norm - #) + # ) return ( apply_feature_probabilities( array, self.probabilities["sample"], scaler=False diff --git a/snekmer/scripts/model_model.py b/snekmer/scripts/model_model.py index 0d8886e..b42dc90 100644 --- a/snekmer/scripts/model_model.py +++ b/snekmer/scripts/model_model.py @@ -151,9 +151,17 @@ y[n]["test"] = le.transform(df_test_labels).ravel() # ROC-AUC figure -clf = LogisticRegression( - random_state=random_state, solver="liblinear", class_weight="balanced" +clf = skm.model.SnekmerModel( + model="logistic", + params={ + "random_state": random_state, + "solver": "liblinear", + "class_weight": "balanced", + }, ) +# clf = LogisticRegression( +# random_state=random_state, solver="liblinear", class_weight="balanced" +# ) fig, ax, auc_rocs = skm.plot.cv_roc_curve( clf, X, y, title=f"{family} ROC Curve ({alphabet_name}, k={config['k']})" ) diff --git a/snekmer/vectorize.py b/snekmer/vectorize.py index 323312f..616bc45 100644 --- a/snekmer/vectorize.py +++ b/snekmer/vectorize.py @@ -9,6 +9,7 @@ import numpy as np from numpy.typing import NDArray +from ._version import __version__ from .alphabet import FULL_ALPHABETS, get_alphabet, get_alphabet_keys from .utils import check_list @@ -227,6 +228,7 @@ def __init__(self, alphabet: Union[str, int], k: int): self.char_set = get_alphabet_keys(alphabet) self.vector = None self.basis = KmerBasis() + self.snekmer_version = __version__ # self.kmer_set = KmerSet(alphabet, k) def set_kmer_set(self, kmer_set=list()):