Skip to content

Commit

Permalink
Add type-hints to adaptive/learner/learner2D.py (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
basnijholt authored Oct 12, 2022
1 parent 8fd6de5 commit e06b34c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 33 deletions.
78 changes: 47 additions & 31 deletions adaptive/learner/learner2D.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
from collections import OrderedDict
from copy import copy
from math import sqrt
from typing import Callable, Iterable

import cloudpickle
import numpy as np
from scipy import interpolate
from scipy.interpolate.interpnd import LinearNDInterpolator

from adaptive.learner.base_learner import BaseLearner
from adaptive.learner.triangulation import simplex_volume_in_embedding
from adaptive.notebook_integration import ensure_holoviews
from adaptive.types import Bool, Float, Real
from adaptive.utils import (
assign_defaults,
cache_latest,
Expand All @@ -30,7 +33,7 @@
# Learner2D and helper functions.


def deviations(ip):
def deviations(ip: LinearNDInterpolator) -> list[np.ndarray]:
"""Returns the deviation of the linear estimate.
Is useful when defining custom loss functions.
Expand Down Expand Up @@ -68,7 +71,7 @@ def deviation(p, v, g):
return devs


def areas(ip):
def areas(ip: LinearNDInterpolator) -> np.ndarray:
"""Returns the area per triangle of the triangulation inside
a `LinearNDInterpolator` instance.
Expand All @@ -89,7 +92,7 @@ def areas(ip):
return areas


def uniform_loss(ip):
def uniform_loss(ip: LinearNDInterpolator) -> np.ndarray:
"""Loss function that samples the domain uniformly.
Works with `~adaptive.Learner2D` only.
Expand Down Expand Up @@ -120,7 +123,9 @@ def uniform_loss(ip):
return np.sqrt(areas(ip))


def resolution_loss_function(min_distance=0, max_distance=1):
def resolution_loss_function(
min_distance: float = 0, max_distance: float = 1
) -> Callable[[LinearNDInterpolator], np.ndarray]:
"""Loss function that is similar to the `default_loss` function, but you
can set the maximimum and minimum size of a triangle.
Expand Down Expand Up @@ -159,7 +164,7 @@ def resolution_loss(ip):
return resolution_loss


def minimize_triangle_surface_loss(ip):
def minimize_triangle_surface_loss(ip: LinearNDInterpolator) -> np.ndarray:
"""Loss function that is similar to the distance loss function in the
`~adaptive.Learner1D`. The loss is the area spanned by the 3D
vectors of the vertices.
Expand Down Expand Up @@ -205,7 +210,7 @@ def _get_vectors(points):
return np.linalg.norm(np.cross(a, b) / 2, axis=1)


def default_loss(ip):
def default_loss(ip: LinearNDInterpolator) -> np.ndarray:
"""Loss function that combines `deviations` and `areas` of the triangles.
Works with `~adaptive.Learner2D` only.
Expand All @@ -225,7 +230,7 @@ def default_loss(ip):
return losses


def choose_point_in_triangle(triangle, max_badness):
def choose_point_in_triangle(triangle: np.ndarray, max_badness: int) -> np.ndarray:
"""Choose a new point in inside a triangle.
If the ratio of the longest edge of the triangle squared
Expand Down Expand Up @@ -364,7 +369,12 @@ class Learner2D(BaseLearner):
over each triangle.
"""

def __init__(self, function, bounds, loss_per_triangle=None):
def __init__(
self,
function: Callable,
bounds: tuple[tuple[Real, Real], tuple[Real, Real]],
loss_per_triangle: Callable | None = None,
) -> None:
self.ndim = len(bounds)
self._vdim = None
self.loss_per_triangle = loss_per_triangle or default_loss
Expand All @@ -379,7 +389,7 @@ def __init__(self, function, bounds, loss_per_triangle=None):

self._bounds_points = list(itertools.product(*bounds))
self._stack.update({p: np.inf for p in self._bounds_points})
self.function = function
self.function = function # type: ignore
self._ip = self._ip_combined = None

self.stack_size = 10
Expand All @@ -388,7 +398,7 @@ def new(self) -> Learner2D:
return Learner2D(self.function, self.bounds, self.loss_per_triangle)

@property
def xy_scale(self):
def xy_scale(self) -> np.ndarray:
xy_scale = self._xy_scale
if self.aspect_ratio == 1:
return xy_scale
Expand Down Expand Up @@ -486,21 +496,21 @@ def load_dataframe(
self.function, df, function_prefix
)

def _scale(self, points):
def _scale(self, points: list[tuple[float, float]] | np.ndarray) -> np.ndarray:
points = np.asarray(points, dtype=float)
return (points - self.xy_mean) / self.xy_scale

def _unscale(self, points):
def _unscale(self, points: np.ndarray) -> np.ndarray:
points = np.asarray(points, dtype=float)
return points * self.xy_scale + self.xy_mean

@property
def npoints(self):
def npoints(self) -> int:
"""Number of evaluated points."""
return len(self.data)

@property
def vdim(self):
def vdim(self) -> int:
"""Length of the output of ``learner.function``.
If the output is unsized (when it's a scalar)
then `vdim = 1`.
Expand All @@ -516,12 +526,14 @@ def vdim(self):
return self._vdim or 1

@property
def bounds_are_done(self):
def bounds_are_done(self) -> bool:
return not any(
(p in self.pending_points or p in self._stack) for p in self._bounds_points
)

def interpolated_on_grid(self, n=None):
def interpolated_on_grid(
self, n: int = None
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Get the interpolated data on a grid.
Parameters
Expand Down Expand Up @@ -553,7 +565,7 @@ def interpolated_on_grid(self, n=None):
xs, ys = self._unscale(np.vstack([xs, ys]).T).T
return xs, ys, zs

def _data_in_bounds(self):
def _data_in_bounds(self) -> tuple[np.ndarray, np.ndarray]:
if self.data:
points = np.array(list(self.data.keys()))
values = np.array(list(self.data.values()), dtype=float)
Expand All @@ -562,7 +574,7 @@ def _data_in_bounds(self):
return points[inds], values[inds].reshape(-1, self.vdim)
return np.zeros((0, 2)), np.zeros((0, self.vdim), dtype=float)

def _data_interp(self):
def _data_interp(self) -> tuple[np.ndarray | list[tuple[float, float]], np.ndarray]:
if self.pending_points:
points = list(self.pending_points)
if self.bounds_are_done:
Expand All @@ -575,7 +587,7 @@ def _data_interp(self):
return points, values
return np.zeros((0, 2)), np.zeros((0, self.vdim), dtype=float)

def _data_combined(self):
def _data_combined(self) -> tuple[np.ndarray, np.ndarray]:
points, values = self._data_in_bounds()
if not self.pending_points:
return points, values
Expand All @@ -584,7 +596,7 @@ def _data_combined(self):
values_combined = np.vstack([values, values_interp])
return points_combined, values_combined

def ip(self):
def ip(self) -> LinearNDInterpolator:
"""Deprecated, use `self.interpolator(scaled=True)`"""
warnings.warn(
"`learner.ip()` is deprecated, use `learner.interpolator(scaled=True)`."
Expand All @@ -593,7 +605,7 @@ def ip(self):
)
return self.interpolator(scaled=True)

def interpolator(self, *, scaled=False):
def interpolator(self, *, scaled: bool = False) -> LinearNDInterpolator:
"""A `scipy.interpolate.LinearNDInterpolator` instance
containing the learner's data.
Expand Down Expand Up @@ -624,7 +636,7 @@ def interpolator(self, *, scaled=False):
points, values = self._data_in_bounds()
return interpolate.LinearNDInterpolator(points, values)

def _interpolator_combined(self):
def _interpolator_combined(self) -> LinearNDInterpolator:
"""A `scipy.interpolate.LinearNDInterpolator` instance
containing the learner's data *and* interpolated data of
the `pending_points`."""
Expand All @@ -634,12 +646,12 @@ def _interpolator_combined(self):
self._ip_combined = interpolate.LinearNDInterpolator(points, values)
return self._ip_combined

def inside_bounds(self, xy):
def inside_bounds(self, xy: tuple[float, float]) -> Bool:
x, y = xy
(xmin, xmax), (ymin, ymax) = self.bounds
return xmin <= x <= xmax and ymin <= y <= ymax

def tell(self, point, value):
def tell(self, point: tuple[float, float], value: float | Iterable[float]) -> None:
point = tuple(point)
self.data[point] = value
if not self.inside_bounds(point):
Expand All @@ -648,15 +660,17 @@ def tell(self, point, value):
self._ip = None
self._stack.pop(point, None)

def tell_pending(self, point):
def tell_pending(self, point: tuple[float, float]) -> None:
point = tuple(point)
if not self.inside_bounds(point):
return
self.pending_points.add(point)
self._ip_combined = None
self._stack.pop(point, None)

def _fill_stack(self, stack_till=1):
def _fill_stack(
self, stack_till: int = 1
) -> tuple[list[tuple[float, float]], list[float]]:
if len(self.data) + len(self.pending_points) < self.ndim + 1:
raise ValueError("too few points...")

Expand Down Expand Up @@ -695,7 +709,9 @@ def _fill_stack(self, stack_till=1):

return points_new, losses_new

def ask(self, n, tell_pending=True):
def ask(
self, n: int, tell_pending: bool = True
) -> tuple[list[tuple[float, float] | np.ndarray], list[float]]:
# Even if tell_pending is False we add the point such that _fill_stack
# will return new points, later we remove these points if needed.
points = list(self._stack.keys())
Expand Down Expand Up @@ -726,14 +742,14 @@ def ask(self, n, tell_pending=True):
return points[:n], loss_improvements[:n]

@cache_latest
def loss(self, real=True):
def loss(self, real: bool = True) -> float:
if not self.bounds_are_done:
return np.inf
ip = self.interpolator(scaled=True) if real else self._interpolator_combined()
losses = self.loss_per_triangle(ip)
return losses.max()

def remove_unfinished(self):
def remove_unfinished(self) -> None:
self.pending_points = set()
for p in self._bounds_points:
if p not in self.data:
Expand Down Expand Up @@ -807,10 +823,10 @@ def plot(self, n=None, tri_alpha=0):

return im.opts(style=im_opts) * tris.opts(style=tri_opts, **no_hover)

def _get_data(self):
def _get_data(self) -> dict[tuple[float, float], Float | np.ndarray]:
return self.data

def _set_data(self, data):
def _set_data(self, data: dict[tuple[float, float], Float | np.ndarray]) -> None:
self.data = data
# Remove points from stack if they already exist
for point in copy(self._stack):
Expand Down
2 changes: 1 addition & 1 deletion adaptive/tests/test_pickling.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def balancing_learner(f, learner_type, learner_kwargs):

learners_pairs = [
(Learner1D, dict(bounds=(-1, 1))),
(Learner2D, dict(bounds=[(-1, 1), (-1, 1)])),
(Learner2D, dict(bounds=((-1, 1), (-1, 1)))),
(SequenceLearner, dict(sequence=list(range(100)))),
(IntegratorLearner, dict(bounds=(0, 1), tol=1e-3)),
(AverageLearner, dict(atol=0.1)),
Expand Down
2 changes: 1 addition & 1 deletion adaptive/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_nonconforming_output(runner):
def f(x):
return [0]

runner(Learner2D(f, [(-1, 1), (-1, 1)]), trivial_goal)
runner(Learner2D(f, ((-1, 1), (-1, 1))), trivial_goal)


def test_aync_def_function():
Expand Down

0 comments on commit e06b34c

Please sign in to comment.