Skip to content

Commit

Permalink
refactor: remove target transformation (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
lsorber authored Feb 4, 2024
1 parent 6e8121f commit a2d6028
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 33 deletions.
10 changes: 7 additions & 3 deletions src/neo_ls_svm/_affine_feature_map.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Affine feature map."""

from functools import cached_property
from typing import TypeVar, cast
from typing import Any, TypeVar, cast

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -86,7 +86,7 @@ def transform(self, X: FloatMatrix[F]) -> FloatMatrix[F]:
if A.shape[1] < A.shape[0]
else (X - shift) @ (A / scale.T)
)
)
).astype(X.dtype)
if self.append_features and A is not None:
X_transformed = np.hstack((X, X_transformed))
return X_transformed
Expand All @@ -108,7 +108,7 @@ def inverse_transform(self, X_transformed: FloatMatrix[F]) -> FloatMatrix[F]:
if A is not None:
pinvA = cast(FloatMatrix[F], self.pseudo_inverse)
X = X @ pinvA
X = (X * scale + shift).astype(shift.dtype)
X = (X * scale + shift).astype(X.dtype)
return X

def get_feature_names_out(
Expand All @@ -130,3 +130,7 @@ def get_feature_names_out(
if self.append_features and A is not None:
output_features = np.hstack((input_features_array, output_features))
return output_features

def _more_tags(self) -> dict[str, Any]:
# https://scikit-learn.org/stable/developers/develop.html#estimator-tags
return {"preserves_dtype": [np.float64, np.float32]}
46 changes: 16 additions & 30 deletions src/neo_ls_svm/_neo_ls_svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ def __init__( # noqa: PLR0913
primal_feature_map: KernelApproximatingFeatureMap | None = None,
dual_feature_map: AffineSeparator | None = None,
dual: bool | None = None,
max_epochs: int = 1,
refit: bool = False,
random_state: int | np.random.RandomState | None = 42,
estimator_type: Literal["classifier", "regressor"] | None = None,
) -> None:
self.primal_feature_map = primal_feature_map
self.dual_feature_map = dual_feature_map
self.dual = dual
self.max_epochs = max_epochs
self.refit = refit
self.random_state = random_state
self.estimator_type = estimator_type
Expand Down Expand Up @@ -140,9 +138,7 @@ def _optimize_β̂_γ(
= 1 / (self.γs_[np.newaxis, :] + λ[:, np.newaxis])
with np.errstate(divide="ignore", invalid="ignore"):
loo_residuals = (φβ̂ @ - y[:, np.newaxis]) / (1 - h @ )
loo_residuals = loo_residuals * self.y_scale_
y_true = y * self.y_scale_ + self.y_shift_
ŷ_loo = loo_residuals + y_true[:, np.newaxis]
ŷ_loo = y[:, np.newaxis] + loo_residuals
# In the case of binary classification, clip overly positive and overly negative
# predictions' residuals to 0 when the labels are positive and negative, respectively.
if self._estimator_type == "classifier":
Expand All @@ -163,14 +159,14 @@ def _optimize_β̂_γ(
self.loo_leverage_ = h @ [:, optimum]
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
self.loo_score_ = accuracy_score(y_true, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
self.loo_score_ = accuracy_score(y, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
elif self._estimator_type == "regressor":
self.loo_score_ = r2_score(y_true, ŷ_loo[:, optimum], sample_weight=s)
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
β̂, γ = β̂ @ [:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
β̂ = np.linalg.solve(γ * C + A, φSTSy)
self.residuals_ = (np.real(φ @ β̂) - y) * self.y_scale_
self.residuals_ = np.real(φ @ β̂) - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
Expand Down Expand Up @@ -273,9 +269,7 @@ def _optimize_α̂_γ(
np.fill_diagonal(F_loo, 0)
α̂_loo = α̂ @ (1 / (self.γs_[np.newaxis, :] * ρ + λ[:, np.newaxis]))
ŷ_loo = np.sum(F_loo[:, np.newaxis, :] * H_loo, axis=2) * α̂_loo + F_loo @ α̂_loo
ŷ_loo = ŷ_loo * self.y_scale_ + self.y_shift_
y_true = y * self.y_scale_ + self.y_shift_
loo_residuals = ŷ_loo - y_true[:, np.newaxis]
loo_residuals = ŷ_loo - y[:, np.newaxis]
# In the case of binary classification, clip overly positive and overly negative
# predictions' residuals to 0 when the labels are positive and negative, respectively.
if self._estimator_type == "classifier":
Expand All @@ -295,21 +289,21 @@ def _optimize_α̂_γ(
self.loo_residuals_ = loo_residuals[:, optimum]
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
self.loo_score_ = accuracy_score(y_true, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
self.loo_score_ = accuracy_score(y, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
elif self._estimator_type == "regressor":
self.loo_score_ = r2_score(y_true, ŷ_loo[:, optimum], sample_weight=s)
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
α̂, γ = α̂_loo[:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
α̂ = np.linalg.solve(γ * ρ * np.diag(sn**-2) + K, y)
self.residuals_ = (F @ α̂ - y) * self.y_scale_
self.residuals_ = F @ α̂ - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
# TODO: Print warning if optimal γ is found at the edge.
return α̂, γ

def fit( # noqa: PLR0915
def fit(
self, X: FloatMatrix[F], y: GenericVector, sample_weight: FloatVector[F] | None = None
) -> "NeoLSSVM":
"""Fit this predictor."""
Expand Down Expand Up @@ -347,19 +341,10 @@ def fit( # noqa: PLR0915
y_ = np.ones(y.shape, dtype=X.dtype)
y_[negatives] = -1
elif self._estimator_type == "regressor":
y_ = cast(npt.NDArray[np.floating[Any]], y)
y_ = y.astype(X.dtype)
else:
message = "Target type not supported"
raise ValueError(message)
# Fit robust shift and scale parameters for the target y.
if self._estimator_type == "classifier":
self.y_shift_: float = 0.0
self.y_scale_: float = 1.0
elif self._estimator_type == "regressor":
l, self.y_shift_, u = np.quantile(y_, [0.05, 0.5, 0.95]) # noqa: E741
self.y_scale_ = np.maximum(np.abs(l - self.y_shift_), np.abs(u - self.y_shift_))
self.y_scale_ = 1.0 if self.y_scale_ <= np.finfo(X.dtype).eps else self.y_scale_
y_ = ((y_ - self.y_shift_) / self.y_scale_).astype(X.dtype)
# Determine whether we want to solve this in the primal or dual space.
self.dual_ = X.shape[0] <= 1024 if self.dual is None else self.dual # noqa: PLR2004
self.primal_ = not self.dual_
Expand Down Expand Up @@ -390,7 +375,7 @@ def fit( # noqa: PLR0915
self.predict_proba_calibrator_ = IsotonicRegression(
out_of_bounds="clip", y_min=0, y_max=1, increasing=True
)
ŷ_loo = self.loo_residuals_ + y_
ŷ_loo = y_ + self.loo_residuals_
target = np.zeros_like(y_)
target[y_ == np.max(y_)] = 1.0
self.predict_proba_calibrator_.fit(ŷ_loo, target, sample_weight_)
Expand All @@ -404,10 +389,11 @@ def decision_function(self, X: FloatMatrix[F]) -> FloatVector[F]:
φ = cast(KernelApproximatingFeatureMap, self.primal_feature_map_).transform(X)
ŷ = np.real(φ @ self.β̂_)
else:
# Shift and scale X, then predict as ŷ(x) := k(x, X) â + 1'â.
# Apply an affine transformation to X, then predict as ŷ(x) := k(x, X) â + 1'â.
X = cast(AffineFeatureMap, self.dual_feature_map_).transform(X)
K = rbf_kernel(X, self.X_, gamma=0.5)
ŷ = K @ self.α̂_ + np.sum(self.α̂_)
b = np.sum(self.α̂_)
ŷ = K @ self.α̂_ + b
return ŷ

def predict(self, X: FloatMatrix[F]) -> GenericVector:
Expand All @@ -423,8 +409,8 @@ def predict(self, X: FloatMatrix[F]) -> GenericVector:
# Remap to the original class labels.
ŷ = self.classes_[((ŷ_df + 1) // 2).astype(np.intp)]
elif self._estimator_type == "regressor":
# Undo the label shift and scale.
ŷ = ŷ_df.astype(np.float64) * self.y_scale_ + self.y_shift_
# The decision function is the point prediction.
ŷ = ŷ_df
# Map back to the training target dtype.
ŷ = ŷ.astype(self.y_dtype_)
return ŷ
Expand Down

0 comments on commit a2d6028

Please sign in to comment.