diff --git a/piglot/objective.py b/piglot/objective.py
index d79a942..eb3b0c5 100644
--- a/piglot/objective.py
+++ b/piglot/objective.py
@@ -18,31 +18,34 @@
class Composition(ABC):
"""Abstract class for defining composition functionals with gradients"""
- def composition(self, inner: np.ndarray) -> float:
+ def composition(self, inner: np.ndarray, params: np.ndarray) -> float:
"""Abstract method for computing the outer function of the composition
Parameters
----------
inner : np.ndarray
Return value from the inner function
+ params : np.ndarray
+ Parameters for the given responses
Returns
-------
float
Scalar composition result
"""
- inner_torch = torch.from_numpy(inner)
- result = self.composition_torch(inner_torch)
+ result = self.composition_torch(torch.from_numpy(inner), torch.from_numpy(params))
return result.numpy(force=True).item()
@abstractmethod
- def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
+ def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor:
"""Abstract method for computing the outer function of the composition with gradients
Parameters
----------
inner : torch.Tensor
Return value from the inner function
+ params : torch.Tensor
+ Parameters for the given responses
Returns
-------
@@ -50,21 +53,6 @@ def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
Scalar composition result
"""
- def __call__(self, inner: np.ndarray) -> float:
- """Compute the composition result for the outer world
-
- Parameters
- ----------
- inner : np.ndarray
- Return value from the inner function
-
- Returns
- -------
- float
- Scalar composition result
- """
- return self.composition(inner)
-
class DynamicPlotter(ABC):
"""Abstract class for dynamically-updating plots"""
@@ -160,6 +148,7 @@ def get_history(self) -> Dict[str, Dict[str, Any]]:
@dataclass
class ObjectiveResult:
"""Container for objective results."""
+ params: np.ndarray
values: List[np.ndarray]
variances: Optional[List[np.ndarray]] = None
@@ -177,16 +166,22 @@ def scalarise(self, composition: Composition = None) -> float:
Scalarised result.
"""
if composition is not None:
- return composition(np.concatenate(self.values))
+ return composition.composition(self.values, self.params)
return np.mean(self.values)
- def scalarise_stochastic(self, composition: Composition = None) -> Tuple[float, float]:
+ def scalarise_stochastic(
+ self,
+ composition: Composition = None,
+ num_samples: int = 1024,
+ ) -> Tuple[float, float]:
"""Scalarise the result.
Parameters
----------
composition : Composition, optional
Composition functional to use, by default None.
+ num_samples : int, optional
+ Number of samples to use for Monte Carlo, by default 1024.
Returns
-------
@@ -194,12 +189,13 @@ def scalarise_stochastic(self, composition: Composition = None) -> Tuple[float,
Scalarised mean and variance.
"""
if composition is not None:
- values = np.concatenate(self.values)
- variances = np.concatenate(self.variances)
# Compute the objective variance using Monte Carlo (using fixed base samples)
- biased = [norm.rvs(loc=0, scale=1) for _ in range(1000)]
- mc_objectives = [composition(values + bias * np.sqrt(variances)) for bias in biased]
- return composition(values), np.var(mc_objectives)
+ biased = [norm.rvs(loc=0, scale=1) for _ in range(num_samples)]
+ mc_objectives = [
+ composition.composition(self.values + bias * np.sqrt(self.variances), self.params)
+ for bias in biased
+ ]
+ return composition.composition(self.values, self.params), np.var(mc_objectives)
return np.mean(self.values), np.sum(self.variances)
diff --git a/piglot/objectives/analytical.py b/piglot/objectives/analytical.py
index bf3d7ea..77d8af7 100644
--- a/piglot/objectives/analytical.py
+++ b/piglot/objectives/analytical.py
@@ -42,7 +42,7 @@ def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveR
Objective result.
"""
value = self.expression(**self.parameters.to_dict(values))
- return ObjectiveResult([np.array([value])])
+ return ObjectiveResult(values, [np.array([value])])
def _objective_denorm(self, values: np.ndarray) -> float:
"""Objective computation for analytical functions (denormalised parameters).
diff --git a/piglot/objectives/compositions.py b/piglot/objectives/compositions.py
deleted file mode 100644
index f45b0da..0000000
--- a/piglot/objectives/compositions.py
+++ /dev/null
@@ -1,160 +0,0 @@
-"""Module for compositions of objectives."""
-from typing import List, Tuple
-import numpy as np
-import torch
-from piglot.objective import Composition
-
-
-class MSEComposition(Composition):
- """Mean squared error outer composite function with gradients."""
-
- def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Compute the MSE outer function of the composition with gradients.
-
- Parameters
- ----------
- inner : torch.Tensor
- Return value from the inner function.
-
- Returns
- -------
- torch.Tensor
- Composition result.
- """
- return torch.mean(torch.square(inner), dim=-1)
-
-
-class MAEComposition(Composition):
- """Mean absolute error outer composite function with gradients."""
-
- def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Compute the MAE outer function of the composition with gradients.
-
- Parameters
- ----------
- inner : torch.Tensor
- Return value from the inner function.
-
- Returns
- -------
- torch.Tensor
- Composition result.
- """
- return torch.mean(torch.abs(inner), dim=-1)
-
-
-AVAILABLE_COMPOSITIONS = {
- 'mse': MSEComposition,
- 'mae': MAEComposition,
-}
-
-
-class UnflattenUtility:
- """Utility for unflattening a set of responses (with gradients)."""
-
- def __init__(self, lengths: List[int]):
- self.lengths = lengths
- self.indices = np.cumsum([0] + lengths)
-
- def unflatten(self, data: np.ndarray) -> List[np.ndarray]:
- """Unflatten a vector containing a set of responses.
-
- Parameters
- ----------
- data : np.ndarray
- Flattened data.
-
- Returns
- -------
- List[np.ndarray]
- List of unflatten responses.
- """
- return [data[..., self.indices[i]:self.indices[i+1]] for i in range(len(self.lengths))]
-
- def unflatten_torch(self, data: torch.Tensor) -> List[torch.Tensor]:
- """Unflatten a vector containing a set of responses with gradients.
-
- Parameters
- ----------
- data : torch.Tensor
- Flattened data.
-
- Returns
- -------
- List[torch.Tensor]
- List of unflatten responses (with gradients).
- """
- return [data[..., self.indices[i]:self.indices[i+1]] for i in range(len(self.lengths))]
-
-
-class FixedLengthTransformer:
- """Utility for transforming a response into a fixed-length format and back."""
-
- def __init__(self, n_points: int):
- self.n_points = n_points
-
- def length(self) -> int:
- """Return the length of the fixed-length response.
-
- Returns
- -------
- int
- Length of the fixed-length response.
- """
- return self.n_points + 2
-
- def transform(self, time: np.ndarray, data: np.ndarray) -> np.ndarray:
- """Transform a response into a fixed-length format.
-
- Parameters
- ----------
- time : np.ndarray
- Time points to transform.
- data : np.ndarray
- Data points to transform.
-
- Returns
- -------
- np.ndarray
- Fixed-length response.
- """
- bounds = np.array([np.min(time), np.max(time)])
- grid = np.linspace(bounds[0], bounds[1], self.n_points)
- response = np.interp(grid, time, data)
- return np.concatenate([response, bounds], axis=-1)
-
- def untransform(self, response: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
- """Untransform a fixed-length response into a response.
-
- Parameters
- ----------
- response : np.ndarray
- Fixed-length response.
-
- Returns
- -------
- Tuple[np.ndarray, np.ndarray]
- Time and data points.
- """
- grid, data = self.untransform_torch(torch.from_numpy(response))
- return grid.numpy(force=True), data.numpy(force=True)
-
- def untransform_torch(self, response: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
- """Untransform a fixed-length response into a response (with gradients).
-
- Parameters
- ----------
- response : torch.Tensor
- Fixed-length response.
-
- Returns
- -------
- Tuple[torch.Tensor, torch.Tensor]
- Time and data points.
- """
- response_gridless = response[..., :-2]
- lbounds = response[..., -2].unsqueeze(-1).expand_as(response_gridless)
- ubounds = response[..., -1].unsqueeze(-1).expand_as(response_gridless)
- reg_grid = torch.linspace(0, 1, self.n_points).expand_as(response_gridless)
- grid = lbounds + reg_grid * (ubounds - lbounds)
- return grid, response_gridless
diff --git a/piglot/objectives/design.py b/piglot/objectives/design.py
index 297db06..c6fdd0e 100644
--- a/piglot/objectives/design.py
+++ b/piglot/objectives/design.py
@@ -1,126 +1,16 @@
"""Module for curve fitting objectives"""
from __future__ import annotations
-from typing import Dict, Any, List, Union, Type
-from abc import ABC, abstractmethod
+from typing import Dict, Any, List, Union
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
-import torch
from piglot.parameter import ParameterSet
from piglot.solver import read_solver
from piglot.solver.solver import Solver, OutputResult
from piglot.objective import Composition, DynamicPlotter, GenericObjective, ObjectiveResult
-from piglot.objectives.compositions import UnflattenUtility, FixedLengthTransformer
from piglot.utils.assorted import stats_interp_to_common_grid
-from piglot.utils.solver_utils import load_module_from_file
-
-
-class Quantity(ABC):
- """Abstract class for quantities to be computed from a response."""
-
- def compute(self, time: np.ndarray, data: np.ndarray) -> float:
- """Compute this quantity for a given response.
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- float
- Quantity value.
- """
- return self.compute_torch(torch.from_numpy(time), torch.from_numpy(data)).item()
-
- @abstractmethod
- def compute_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
- """Compute this quantity for a given response (with gradients).
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- torch.Tensor
- Quantity value (with gradients).
- """
-
-
-class MaxQuantity(Quantity):
- """Maximum value of a response."""
-
- def compute_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
- """Get the maximum of a given response (with gradients).
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- torch.Tensor
- Quantity value (with gradients).
- """
- return torch.max(data, dim=-1)[0]
-
-
-class MinQuantity(Quantity):
- """Minimum value of a response."""
-
- def compute_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
- """Get the minimum of a given response (with gradients).
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- torch.Tensor
- Quantity value (with gradients).
- """
- return torch.min(data, dim=-1)[0]
-
-
-class IntegralQuantity(Quantity):
- """Integral of a response."""
-
- def compute_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
- """Get the integral of a given response (with gradients).
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- torch.Tensor
- Quantity value (with gradients).
- """
- return torch.trapz(data, time, dim=-1)
-
-
-AVAILABLE_QUANTITIES: Dict[str, Type[Quantity]] = {
- 'max': MaxQuantity,
- 'min': MinQuantity,
- 'integral': IntegralQuantity,
-}
+from piglot.utils.reductions import Reduction, NegateReduction, read_reduction
+from piglot.utils.composition.responses import ResponseComposition, EndpointFlattenUtility
class DesignTarget:
@@ -130,7 +20,7 @@ def __init__(
self,
name: str,
prediction: Union[str, List[str]],
- quantity: Quantity,
+ quantity: Reduction,
negate: bool = False,
weight: float = 1.0,
n_points: int = None,
@@ -142,62 +32,10 @@ def __init__(
raise ValueError(f"Invalid prediction '{prediction}' for design target '{name}'.")
self.name = name
self.prediction = prediction
- self.quantity = quantity
- self.negate = negate
+ self.quantity = NegateReduction(quantity) if negate else quantity
self.weight = weight
self.n_points = n_points
- self.transformer = FixedLengthTransformer(n_points) # if n_points is not None else None
-
- def compute_quantity(self, time: np.ndarray, data: np.ndarray) -> float:
- """Compute the design quantity for the given results.
-
- Parameters
- ----------
- time : np.ndarray
- Time points of the response.
- data : np.ndarray
- Data points of the response.
-
- Returns
- -------
- float
- Design quantity for this response.
- """
- value = self.quantity.compute(time, data)
- return -value if self.negate else value
-
- def compute_quantity_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
- """Compute the design quantity for the given results.
-
- Parameters
- ----------
- time : torch.Tensor
- Time points of the response.
- data : torch.Tensor
- Data points of the response.
-
- Returns
- -------
- torch.Tensor
- Design quantity for this response.
- """
- value = self.quantity.compute_torch(time, data)
- return -value if self.negate else value
-
- def flatten(self, response: OutputResult) -> np.ndarray:
- """Flatten the response for this target.
-
- Parameters
- ----------
- response : OutputResult
- Response to flatten.
-
- Returns
- -------
- np.ndarray
- Flattened response.
- """
- return self.transformer.transform(response.get_time(), response.get_data())
+ self.flatten_utility = EndpointFlattenUtility(n_points) if n_points is not None else None
@staticmethod
def read(name: str, config: Dict[str, Any], output_dir: str) -> DesignTarget:
@@ -223,26 +61,10 @@ def read(name: str, config: Dict[str, Any], output_dir: str) -> DesignTarget:
# Read the quantity
if 'quantity' not in config:
raise ValueError("Missing quantity for fitting objective.")
- quantity = config['quantity']
- # Parse specification: simple or complete
- if isinstance(quantity, str):
- quantity = {'name': quantity}
- elif 'name' not in quantity:
- raise ValueError("Missing name in quantity specification.")
- # Parse script arguments, if passed
- if quantity['name'] == 'script':
- if 'script' not in quantity:
- raise ValueError("Missing script in quantity specification.")
- if 'class' not in quantity:
- raise ValueError("Missing class in quantity specification.")
- quantity_class: Type[Quantity] = load_module_from_file(quantity['script'],
- quantity['class'])
- else:
- quantity_class: Type[Quantity] = AVAILABLE_QUANTITIES[quantity['name']]
return DesignTarget(
name,
config['prediction'],
- quantity_class(),
+ read_reduction(config['quantity']),
negate=bool(config.get('negate', False)),
weight=float(config.get('weight', 1.0)),
n_points=int(config['n_points']) if 'n_points' in config else None,
@@ -293,93 +115,91 @@ def update(self) -> None:
fig.canvas.flush_events()
-class DesignComposition(Composition):
- """Optimisation composition function for design objectives."""
+class DesignObjective(GenericObjective):
+ """Scalar design objective function."""
- def __init__(self, outmost: str, targets: List[DesignTarget]) -> None:
- # Sanitise the number of points for each design target
- for target in targets:
- if target.n_points is None:
- raise ValueError(
- "All targets must have a number of points specified for the composition."
- )
+ def __init__(
+ self,
+ parameters: ParameterSet,
+ solver: Solver,
+ targets: List[DesignTarget],
+ output_dir: str,
+ stochastic: bool = False,
+ composite: bool = False,
+ ) -> None:
+ super().__init__(
+ parameters,
+ stochastic,
+ composition=self.__composition(True, stochastic, targets) if composite else None,
+ output_dir=output_dir,
+ )
+ self.solver = solver
self.targets = targets
- self.unflatten_utility = UnflattenUtility([t.transformer.length() for t in self.targets])
- def composition(self, inner: np.ndarray) -> float:
- """Compute the outer function of the composition.
+ def prepare(self) -> None:
+ """Prepare the objective for optimisation."""
+ super().prepare()
+ self.solver.prepare()
+
+ @staticmethod
+ def __composition(
+ scalarise: bool,
+ stochastic: bool,
+ targets: List[DesignTarget],
+ ) -> Composition:
+ """Build the composition utility for the design objective.
Parameters
----------
- inner : np.ndarray
- Return value from the inner function.
+ scalarise : bool
+ Whether we should scalarise the objective.
+ stochastic : bool
+ Whether the objective is stochastic.
+ targets : List[DesignTarget]
+ List of design targets to consider.
Returns
-------
- float
- Scalar composition result.
+ Composition
+ Composition utility for the design objective.
"""
- unflatten = self.unflatten_utility.unflatten(inner)
- responses = [
- target.transformer.untransform(flattened)
- for target, flattened in zip(self.targets, unflatten)
- ]
- quantities = [
- target.compute_quantity(*response) * target.weight
- for response, target in zip(responses, self.targets)
- ]
- return np.mean(quantities, axis=0)
-
- def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Compute the outer function of the composition with gradients.
+ # Sanitise the number of points for each design target
+ for target in targets:
+ if target.n_points is None:
+ raise ValueError(
+ "All targets must have a number of points specified for the composition."
+ )
+ return ResponseComposition(
+ scalarise=scalarise,
+ stochastic=stochastic,
+ weights=[t.weight for t in targets],
+ reductions=[t.quantity for t in targets],
+ flatten_list=[t.flatten_utility for t in targets],
+ )
+
+ @staticmethod
+ def __interpolate_to_common_grid(
+ response: OutputResult,
+ n_points: int,
+ ) -> OutputResult:
+ """Interpolate the response to a common grid.
Parameters
----------
- inner : torch.Tensor
- Return value from the inner function.
+ response : OutputResult
+ Response to interpolate.
+ n_points : int
+ Number of points to interpolate to.
Returns
-------
- torch.Tensor
- Scalar composition result.
+ OutputResult
+ Interpolated response.
"""
- unflatten = self.unflatten_utility.unflatten_torch(inner)
- responses = [
- target.transformer.untransform_torch(flattened)
- for target, flattened in zip(self.targets, unflatten)
- ]
- quantities = torch.stack([
- target.compute_quantity_torch(*response) * target.weight
- for response, target in zip(responses, self.targets)
- ], dim=-1)
- return torch.mean(quantities, dim=-1)
-
-
-class DesignObjective(GenericObjective):
- """Scalar design objective function."""
-
- def __init__(
- self,
- parameters: ParameterSet,
- solver: Solver,
- targets: List[DesignTarget],
- output_dir: str,
- stochastic: bool = False,
- composition: str = None,
- ) -> None:
- super().__init__(
- parameters,
- stochastic,
- composition=DesignComposition(composition, targets) if composition else None,
- output_dir=output_dir,
- )
- self.solver = solver
- self.targets = targets
-
- def prepare(self) -> None:
- """Prepare the objective for optimisation."""
- super().prepare()
- self.solver.prepare()
+ # Interpolate to common grid
+ time = np.linspace(np.min(response.get_time()), np.max(response.get_time()), n_points)
+ data = np.interp(time, response.get_time(), response.get_data())
+ return OutputResult(time, data)
def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveResult:
"""Objective computation for design objectives.
@@ -396,26 +216,32 @@ def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveR
ObjectiveResult
Objective result.
"""
- responses = self.solver.solve(values, concurrent)
+ raw_responses = self.solver.solve(values, concurrent)
+ # Interpolate responses to common grid and map to targets
+ responses_interp = {
+ target: [
+ self.__interpolate_to_common_grid(raw_responses[pred], target.n_points)
+ if target.n_points is not None else raw_responses[pred]
+ for pred in target.prediction
+ ]
+ for target in self.targets
+ }
+ # Under composition, we delegate the computation to the composition utility
+ if self.composition is not None:
+ return self.composition.transform(values, list(responses_interp.values()))
+ # Otherwise, compute the objective directly
results = []
variances = []
- if self.composition is None:
- for target in self.targets:
- targets = [
- target.compute_quantity(responses[pred].get_time(), responses[pred].get_data())
- for pred in target.prediction
- ]
- results.append(target.weight * np.mean(targets))
- variances.append(target.weight * np.var(targets) / len(targets))
- else:
- for target in self.targets:
- flat_responses = np.array([
- target.flatten(responses[pred])
- for pred in target.prediction
- ])
- results.append(np.mean(flat_responses, axis=0))
- variances.append(np.var(flat_responses, axis=0) / flat_responses.shape[0])
- return ObjectiveResult(results, variances if self.stochastic else None)
+ for target, responses in responses_interp.items():
+ # Evaluate target quantities for each response
+ targets = [
+ target.quantity.reduce(response.get_time(), response.get_data(), values)
+ for response in responses
+ ]
+ # Build statistical model for the target
+ results.append(target.weight * np.mean(targets))
+ variances.append(target.weight * np.var(targets) / len(targets))
+ return ObjectiveResult(values, results, variances if self.stochastic else None)
def plot_case(self, case_hash: str, options: Dict[str, Any] = None) -> List[Figure]:
"""Plot a given function call given the parameter hash
@@ -531,5 +357,5 @@ def read(
list(targets.keys()),
output_dir,
stochastic=bool(config.get('stochastic', False)),
- composition=config.get('composite', None),
+ composite=bool(config.get('composite', False)),
)
diff --git a/piglot/objectives/fitting.py b/piglot/objectives/fitting.py
index d3d68a8..ab3ec02 100644
--- a/piglot/objectives/fitting.py
+++ b/piglot/objectives/fitting.py
@@ -1,131 +1,21 @@
"""Module for curve fitting objectives"""
from __future__ import annotations
from typing import Dict, Any, List, Union
-from abc import abstractmethod, ABC
import os
import sys
import numpy as np
-import torch
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from piglot.parameter import ParameterSet
from piglot.solver import read_solver
from piglot.solver.solver import Solver, OutputResult
from piglot.utils.assorted import stats_interp_to_common_grid
+from piglot.utils.reductions import Reduction, read_reduction
from piglot.utils.responses import Transformer, reduce_response, interpolate_response
+from piglot.utils.composition.responses import ResponseComposition, FixedFlatteningUtility
from piglot.objective import Composition, DynamicPlotter, GenericObjective, ObjectiveResult
-class Reduction(ABC):
- """Abstract class for reduction functions with gradients."""
-
- @abstractmethod
- def reduce(self, inner: np.ndarray) -> np.ndarray:
- """Abstract method for computing the reduction of numpy arrays.
-
- Parameters
- ----------
- inner : np.ndarray
- Reduction input.
-
- Returns
- -------
- np.ndarray
- Reduction result.
- """
-
- @abstractmethod
- def reduce_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Abstract method for computing the reduction of torch tensors with gradients.
-
- Parameters
- ----------
- inner : torch.Tensor
- Reduction input.
-
- Returns
- -------
- torch.Tensor
- Reduction result (with gradients).
- """
-
-
-class MSE(Reduction):
- """Mean squared error reduction function."""
-
- def reduce(self, inner: np.ndarray) -> np.ndarray:
- """Compute the mean squared error.
-
- Parameters
- ----------
- inner : np.ndarray
- Reduction input.
-
- Returns
- -------
- np.ndarray
- Reduction result.
- """
- return np.mean(np.square(inner), axis=-1)
-
- def reduce_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Compute the mean squared error.
-
- Parameters
- ----------
- inner : torch.Tensor
- Reduction input.
-
- Returns
- -------
- torch.Tensor
- Reduction result (with gradients).
- """
- return torch.mean(torch.square(inner), dim=-1)
-
-
-AVAILABLE_REDUCTIONS = {
- 'mse': MSE,
-}
-
-
-class CompositionFromReduction(Composition):
- """Optimisation composition function from a reduction function."""
-
- def __init__(self, reduction: Reduction):
- self.reduction = reduction
-
- def composition(self, inner: np.ndarray) -> float:
- """Compute the MSE outer function of the composition
-
- Parameters
- ----------
- inner : np.ndarray
- Return value from the inner function
-
- Returns
- -------
- float
- Scalar composition result
- """
- return self.reduction.reduce(inner)
-
- def composition_torch(self, inner: torch.Tensor) -> torch.Tensor:
- """Compute the MSE outer function of the composition with gradients
-
- Parameters
- ----------
- inner : torch.Tensor
- Return value from the inner function
-
- Returns
- -------
- torch.Tensor
- Scalar composition result
- """
- return self.reduction.reduce_torch(inner)
-
-
class Reference:
"""Container for reference solutions."""
@@ -141,6 +31,7 @@ def __init__(
filter_tol: float = 0.0,
show: bool = False,
weight: float = 1.0,
+ reduction: Reduction = None,
):
# Sanitise prediction field
if isinstance(prediction, str):
@@ -154,6 +45,7 @@ def __init__(
self.filter_tol = filter_tol
self.show = show
self.weight = weight
+ self.reduction = reduction
# Load the data right away
data = np.genfromtxt(filename, skip_header=skip_header)[:, [x_col - 1, y_col - 1]]
self.x_data = data[:, 0]
@@ -163,6 +55,7 @@ def __init__(
self.x_data, self.y_data = self.transformer(self.x_data, self.y_data)
self.x_orig = np.copy(self.x_data)
self.y_orig = np.copy(self.y_data)
+ self.flatten_utility = FixedFlatteningUtility(self.x_data)
def prepare(self) -> None:
"""Prepare the reference data."""
@@ -193,6 +86,8 @@ def prepare(self) -> None:
),
np.stack((self.x_data, self.y_data), axis=1),
)
+ # Reset the flattening utility
+ self.flatten_utility = FixedFlatteningUtility(self.x_data)
def has_filtering(self) -> bool:
"""Check if the reference has filtering.
@@ -244,7 +139,7 @@ def get_orig_data(self) -> np.ndarray:
"""
return self.y_orig
- def compute_errors(self, results: OutputResult) -> np.ndarray:
+ def compute_errors(self, results: OutputResult) -> OutputResult:
"""Compute the pointwise normalised errors for the given results.
Parameters
@@ -254,8 +149,8 @@ def compute_errors(self, results: OutputResult) -> np.ndarray:
Returns
-------
- np.ndarray
- Error for each reference point
+ OutputResult
+ Pointwise normalised errors.
"""
# Interpolate response to the reference grid
resp_interp = interpolate_response(
@@ -265,7 +160,7 @@ def compute_errors(self, results: OutputResult) -> np.ndarray:
)
# Compute normalised error
factor = np.mean(np.abs(self.get_data()))
- return (resp_interp - self.get_data()) / factor
+ return OutputResult(self.get_time(), (resp_interp - self.get_data()) / factor)
@staticmethod
def read(filename: str, config: Dict[str, Any], output_dir: str) -> Reference:
@@ -298,6 +193,7 @@ def read(filename: str, config: Dict[str, Any], output_dir: str) -> Reference:
filter_tol=float(config.get('filter_tol', 0.0)),
show=bool(config.get('show', False)),
weight=float(config.get('weight', 1.0)),
+ reduction=read_reduction(config['reduction']) if 'reduction' in config else None,
)
@@ -362,9 +258,15 @@ def __init__(
self,
solver: Solver,
references: Dict[Reference, List[str]],
+ reduction: Reduction,
) -> None:
self.solver = solver
self.references = references
+ self.reduction = reduction
+ # Assign the reduction to non-defined references
+ for reference in self.references:
+ if reference.reduction is None:
+ reference.reduction = self.reduction
def prepare(self) -> None:
"""Prepare the solver for optimisation."""
@@ -372,6 +274,35 @@ def prepare(self) -> None:
reference.prepare()
self.solver.prepare()
+ def composition(
+ self,
+ scalarise: bool,
+ stochastic: bool,
+ ) -> Composition:
+ """Build the composition utility for the design objective.
+
+ Parameters
+ ----------
+ scalarise : bool
+ Whether we should scalarise the objective.
+ stochastic : bool
+ Whether the objective is stochastic.
+ targets : List[DesignTarget]
+ List of design targets to consider.
+
+ Returns
+ -------
+ Composition
+ Composition utility for the design objective.
+ """
+ return ResponseComposition(
+ scalarise=scalarise,
+ stochastic=stochastic,
+ weights=[t.weight for t in self.references],
+ reductions=[t.reduction for t in self.references],
+ flatten_list=[t.flatten_utility for t in self.references],
+ )
+
def solve(self, values: np.ndarray, concurrent: bool) -> Dict[Reference, List[OutputResult]]:
"""Evaluate the solver for the given set of parameter values and get the output results.
@@ -516,8 +447,10 @@ def read(config: Dict[str, Any], parameters: ParameterSet, output_dir: str) -> F
# Sanitise reference: check if it is associated to at least one case
if len(references[reference]) == 0:
raise ValueError(f"Reference '{reference_name}' is not associated to any case.")
+ # Read the optional reduction
+ reduction = read_reduction(config.get('reduction', 'mse'))
# Return the solver
- return FittingSolver(solver, references)
+ return FittingSolver(solver, references, reduction)
class FittingObjective(GenericObjective):
@@ -528,17 +461,15 @@ def __init__(
parameters: ParameterSet,
solver: FittingSolver,
output_dir: str,
- reduction: Reduction,
stochastic: bool = False,
composite: bool = False,
) -> None:
super().__init__(
parameters,
stochastic,
- composition=CompositionFromReduction(reduction) if composite else None,
+ composition=solver.composition(True, stochastic) if composite else None,
output_dir=output_dir,
)
- self.reduction = reduction
self.composite = composite
self.solver = solver
@@ -546,6 +477,9 @@ def prepare(self) -> None:
"""Prepare the objective for optimisation."""
super().prepare()
self.solver.prepare()
+ # Reset the composition utility
+ if self.composite:
+ self.composition = self.solver.composition(True, self.stochastic)
def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveResult:
"""Abstract method for objective computation.
@@ -564,20 +498,26 @@ def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveR
"""
# Run the solver
output = self.solver.solve(values, concurrent)
- # Compute the loss
+ # Compute pointwise errors for each reference
+ errors = {
+ reference: [reference.compute_errors(result) for result in results]
+ for reference, results in output.items()
+ }
+ # Under composition, we delegate the computation to the composition utility
+ if self.composition is not None:
+ return self.composition.transform(values, list(errors.values()))
+ # Otherwise, compute the objective directly
losses = []
variances = []
- for reference, results in output.items():
- # Compute the errors for each result and apply reduction according to the objective type
- errors = np.array([reference.compute_errors(result) for result in results])
- if not self.composite:
- errors = self.reduction.reduce(errors)
- loss = np.mean(errors, axis=0)
- variance = np.var(errors, axis=0) / errors.shape[0]
- # Collect the weighted losses and variances
- losses.append(reference.weight * loss)
- variances.append(reference.weight * variance)
- return ObjectiveResult(losses, variances if self.stochastic else None)
+ for reference, responses in errors.items():
+ targets = [
+ reference.reduction.reduce(response.get_time(), response.get_data(), values)
+ for response in responses
+ ]
+ # Build statistical model for the target
+ losses.append(reference.weight * np.mean(targets))
+ variances.append(reference.weight * np.var(targets) / len(targets))
+ return ObjectiveResult(values, losses, variances if self.stochastic else None)
def plot_case(self, case_hash: str, options: Dict[str, Any] = None) -> List[Figure]:
"""Plot a given function call given the parameter hash.
@@ -646,13 +586,10 @@ def read(
# Parse the reduction
if 'reduction' not in config:
config['reduction'] = 'mse'
- elif config['reduction'] not in AVAILABLE_REDUCTIONS:
- raise ValueError(f"Invalid reduction '{config['reduction']}' for fitting objective.")
return FittingObjective(
parameters,
FittingSolver.read(config, parameters, output_dir),
output_dir,
- AVAILABLE_REDUCTIONS[config['reduction']](),
stochastic=bool(config.get('stochastic', False)),
composite=bool(config.get('composite', False)),
)
diff --git a/piglot/objectives/synthetic.py b/piglot/objectives/synthetic.py
index 1bf5ae0..02a97c3 100644
--- a/piglot/objectives/synthetic.py
+++ b/piglot/objectives/synthetic.py
@@ -7,8 +7,10 @@
import botorch.test_functions.synthetic
from botorch.test_functions.synthetic import SyntheticTestFunction
from piglot.parameter import ParameterSet
-from piglot.objective import GenericObjective, ObjectiveResult, Composition
-from piglot.objectives.compositions import AVAILABLE_COMPOSITIONS
+from piglot.objective import GenericObjective, ObjectiveResult
+from piglot.solver.solver import OutputResult
+from piglot.utils.reductions import Reduction, read_reduction
+from piglot.utils.composition.responses import ResponseComposition, FixedFlatteningUtility
class SyntheticObjective(GenericObjective):
@@ -20,13 +22,13 @@ def __init__(
name: str,
output_dir: str,
transform: str = None,
- composition: Composition = None,
+ composition: Reduction = None,
**kwargs,
) -> None:
super().__init__(
parameters,
stochastic=False,
- composition=composition,
+ composition=self.__composition(composition) if composition is not None else None,
output_dir=output_dir,
)
test_functions = self.get_test_functions()
@@ -39,6 +41,28 @@ def __init__(
with open(os.path.join(output_dir, 'optimum_value'), 'w', encoding='utf8') as file:
file.write(f'{self.transform(self.func.optimal_value, self.func)}')
+ @staticmethod
+ def __composition(reduction: Reduction) -> ResponseComposition:
+ """Create a response composition from a reduction.
+
+ Parameters
+ ----------
+ reduction : Reduction
+ Reduction to apply.
+
+ Returns
+ -------
+ ResponseComposition
+ Composition to apply.
+ """
+ return ResponseComposition(
+ True,
+ False,
+ [1.0],
+ [reduction],
+ [FixedFlatteningUtility(np.array([0.0]))],
+ )
+
@staticmethod
def get_test_functions() -> Dict[str, Type[SyntheticTestFunction]]:
"""Return available test functions.
@@ -93,7 +117,12 @@ def _objective(self, values: np.ndarray, concurrent: bool = False) -> ObjectiveR
elif self.transform is not None:
value = self.transform(value, self.func)
value = float(value.item())
- return ObjectiveResult([np.array([value])])
+ if self.composition is None:
+ return ObjectiveResult(values, [np.array([value])])
+ return self.composition.transform(
+ values,
+ [[OutputResult(np.array([0.0]), np.array([value]))]],
+ )
@staticmethod
def read(
@@ -123,11 +152,8 @@ def read(
function = config.pop('function')
# Optional arguments
composition = None
- composition_name = config.pop('composition', None)
- if composition_name is not None:
- if composition_name not in AVAILABLE_COMPOSITIONS:
- raise RuntimeError(f'Unknown composition {composition_name}.')
- composition = AVAILABLE_COMPOSITIONS[composition_name]()
+ if 'composition' in config:
+ composition = read_reduction(config.pop('composition'))
return SyntheticObjective(
parameters,
function,
diff --git a/piglot/optimiser.py b/piglot/optimiser.py
index 06278f6..06582ae 100644
--- a/piglot/optimiser.py
+++ b/piglot/optimiser.py
@@ -255,7 +255,7 @@ def optimise(
self.iters_no_improv = 0
# Build initial shot and bounds
n_dim = len(self.parameters)
- init_shot = [par.normalise(par.inital_value) for par in self.parameters]
+ init_shot = np.array([par.normalise(par.inital_value) for par in self.parameters])
bounds = np.ones((n_dim, 2))
bounds[:, 0] = -1
# Build best solution
diff --git a/piglot/optimisers/botorch/bayes.py b/piglot/optimisers/botorch/bayes.py
index 2b03b1b..1715543 100644
--- a/piglot/optimisers/botorch/bayes.py
+++ b/piglot/optimisers/botorch/bayes.py
@@ -185,11 +185,8 @@ def _get_random_points(
def _result_to_dataset(self, result: ObjectiveResult) -> Tuple[np.ndarray, np.ndarray]:
if self.objective.composition:
- values = np.concatenate(result.values)
- if self.objective.stochastic:
- variances = np.concatenate(result.variances)
- else:
- variances = np.zeros_like(values)
+ values = result.values
+ variances = result.variances if self.objective.stochastic else np.zeros_like(values)
else:
if self.objective.stochastic:
values, variances = ObjectiveResult.scalarise_stochastic(result)
@@ -198,11 +195,15 @@ def _result_to_dataset(self, result: ObjectiveResult) -> Tuple[np.ndarray, np.nd
values, variances = np.array([values]), np.array([variances])
return values, variances
- def _value_to_scalar(self, value: Union[np.ndarray, torch.Tensor]) -> float:
+ def _value_to_scalar(
+ self,
+ value: Union[np.ndarray, torch.Tensor],
+ params: Union[np.ndarray, torch.Tensor],
+ ) -> float:
if self.objective.composition:
if isinstance(value, np.ndarray):
- return self.objective.composition.composition(value)
- return self.objective.composition.composition_torch(value).cpu().item()
+ return self.objective.composition.composition(value, params)
+ return self.objective.composition.composition_torch(value, params).cpu().item()
return value.item()
def _build_acquisition_scalar(
@@ -253,7 +254,8 @@ def _build_acquisition_composite(
_, y_avg, y_std = dataset.get_obervation_stats()
mc_objective = GenericMCObjective(
lambda vals, X: -self.objective.composition.composition_torch(
- dataset.expand_observations(vals * y_std + y_avg)
+ dataset.expand_observations(vals * y_std + y_avg),
+ dataset.lbounds + (dataset.ubounds - dataset.lbounds) * X,
)
)
# Delegate acquisition building
@@ -336,7 +338,7 @@ def _optimise(
# Find current best point to return to the driver
best_params, best_result = dataset.min(self._value_to_scalar)
- self._progress_check(0, self._value_to_scalar(best_result), best_params)
+ self._progress_check(0, self._value_to_scalar(best_result, best_params), best_params)
# Optimisation loop
for i_iter in range(n_iter):
@@ -357,7 +359,7 @@ def _optimise(
values_batch = []
for i, result in enumerate(results):
values, variances = self._result_to_dataset(result)
- values_batch.append(self._value_to_scalar(values))
+ values_batch.append(self._value_to_scalar(values, candidates[i, :]))
dataset.push(candidates[i, :], values, variances)
# Find best observation for this batch
@@ -372,4 +374,4 @@ def _optimise(
# Return optimisation result
best_params, best_result = dataset.min(self._value_to_scalar)
- return best_params, self._value_to_scalar(best_result)
+ return best_params, self._value_to_scalar(best_result, best_params)
diff --git a/piglot/optimisers/botorch/dataset.py b/piglot/optimisers/botorch/dataset.py
index 30c6ea0..2336926 100644
--- a/piglot/optimisers/botorch/dataset.py
+++ b/piglot/optimisers/botorch/dataset.py
@@ -234,7 +234,7 @@ def destandardise(
def min(
self,
- transformer: Callable[[torch.Tensor], float],
+ transformer: Callable[[torch.Tensor, torch.Tensor], float],
) -> Tuple[np.ndarray, np.ndarray]:
"""Return the minimum value of the dataset, according to a given transformation.
@@ -248,7 +248,8 @@ def min(
Tuple[np.ndarray, np.ndarray]
Parameters and values for the minimum point.
"""
- idx = np.argmin([transformer(value) for value in self.values])
+ values = [transformer(value, params) for params, value in zip(self.params, self.values)]
+ idx = np.argmin(values)
return self.params[idx, :].cpu().numpy(), self.values[idx, :].cpu().numpy()
def to(self, device: str) -> BayesDataset:
diff --git a/piglot/parameter.py b/piglot/parameter.py
index 38144da..5e197bf 100644
--- a/piglot/parameter.py
+++ b/piglot/parameter.py
@@ -156,7 +156,7 @@ def normalise(self, values):
array
Normalised parameters.
"""
- return [p.normalise(values[i]) for i, p in enumerate(self.parameters)]
+ return np.array([p.normalise(values[i]) for i, p in enumerate(self.parameters)])
def denormalise(self, values):
"""Denormalises a parameter set from the internal [-1,1] bounds.
@@ -171,7 +171,7 @@ def denormalise(self, values):
array
Denormalised parameters.
"""
- return [p.denormalise(values[i]) for i, p in enumerate(self.parameters)]
+ return np.array([p.denormalise(values[i]) for i, p in enumerate(self.parameters)])
def clip(self, values):
"""Clamp the parameter set to the [lbound,ubound] interval.
@@ -186,7 +186,7 @@ def clip(self, values):
float
Clamped parameters.
"""
- return [p.clip(values[i]) for i, p in enumerate(self.parameters)]
+ return np.array([p.clip(values[i]) for i, p in enumerate(self.parameters)])
def to_dict(self, values, input_normalised=True):
"""Build a dict with name-value pairs given a list of values.
diff --git a/piglot/utils/assorted.py b/piglot/utils/assorted.py
index d126e28..23abbdf 100644
--- a/piglot/utils/assorted.py
+++ b/piglot/utils/assorted.py
@@ -1,8 +1,9 @@
"""Assorted utilities."""
-from typing import List, Dict, Tuple
+from typing import List, Dict, Tuple, Type, Any, TypeVar
import os
import contextlib
import numpy as np
+import importlib.util
from scipy.stats import t
@@ -142,3 +143,38 @@ def change_cwd(path: str):
yield
finally:
os.chdir(old)
+
+
+T = TypeVar('T')
+
+
+def read_custom_module(config: Dict[str, Any], cls: Type[T]) -> Type[T]:
+ """Read a custom module from a configuration spec.
+
+ Parameters
+ ----------
+ config : Dict[str, Any]
+ Configuration of the custom module.
+ cls : Type
+ Base class of the module to load.
+
+ Returns
+ -------
+ Type
+ Custom module type read from the script.
+ """
+ # Sanitise the configuration
+ if 'script' not in config:
+ raise ValueError("Missing 'script' field for reading the custom module script.")
+ if 'class' not in config:
+ raise ValueError("Missing 'class' field for reading the custom module script.")
+ # Load the module
+ module_name = f'piglot_{os.path.basename(config["script"]).replace(".", "_")}'
+ spec = importlib.util.spec_from_file_location(module_name, config['script'])
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ module_class = getattr(module, config['class'])
+ # Sanitise the class
+ if not issubclass(module_class, cls):
+ raise ValueError(f"Custom class '{module_class}' is not a subclass of '{cls}'.")
+ return module_class
diff --git a/piglot/utils/composition/responses.py b/piglot/utils/composition/responses.py
new file mode 100644
index 0000000..efa7c60
--- /dev/null
+++ b/piglot/utils/composition/responses.py
@@ -0,0 +1,395 @@
+"""Module with utilities for transforming responses under compositions."""
+from typing import List, Tuple
+from abc import ABC, abstractmethod
+import numpy as np
+import torch
+from piglot.objective import Composition, ObjectiveResult
+from piglot.solver.solver import OutputResult
+from piglot.utils.reductions import Reduction
+
+
+class FlattenUtility(ABC):
+ """Utility for flattening a response into a fixed-size vector (with gradients)."""
+
+ @abstractmethod
+ def length(self) -> int:
+ """Return the length of the flattened vector.
+
+ Returns
+ -------
+ int
+ Length of the flattened vector.
+ """
+
+ @abstractmethod
+ def flatten_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
+ """Flatten a response into a single vector (with gradients).
+
+ Parameters
+ ----------
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+
+ Returns
+ -------
+ torch.Tensor
+ Flattened responses.
+ """
+
+ @abstractmethod
+ def unflatten_torch(self, data: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
+ """Unflatten a vector containing a response (with gradients).
+
+ Parameters
+ ----------
+ data : torch.Tensor
+ Flattened responses.
+
+ Returns
+ -------
+ Tuple[torch.Tensor, torch.Tensor]
+ List of responses.
+ """
+
+ def flatten(self, time: np.ndarray, data: np.ndarray) -> np.ndarray:
+ """Flatten a response into a single vector.
+
+ Parameters
+ ----------
+ time : np.ndarray
+ Time points of the response.
+ data : np.ndarray
+ Data points of the response.
+
+ Returns
+ -------
+ np.ndarray
+ Flattened responses.
+ """
+ return self.flatten_torch(torch.from_numpy(time), torch.from_numpy(data)).numpy(force=True)
+
+ def unflatten(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
+ """Unflatten a vector containing a response.
+
+ Parameters
+ ----------
+ data : np.ndarray
+ Flattened responses.
+
+ Returns
+ -------
+ Tuple[np.ndarray, np.ndarray]
+ List of responses.
+ """
+ time, values = self.unflatten_torch(torch.from_numpy(data))
+ return time.numpy(force=True), values.numpy(force=True)
+
+
+class FixedFlatteningUtility(FlattenUtility):
+ """Response flattening utility for fixed time grids."""
+
+ def __init__(self, time_grid: np.ndarray):
+ self.time_grid = time_grid
+
+ def length(self) -> int:
+ """Return the length of the flattened vector.
+
+ Returns
+ -------
+ int
+ Length of the flattened vector.
+ """
+ return len(self.time_grid)
+
+ def flatten_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
+ """Flatten a response into a single vector (with gradients).
+
+ Parameters
+ ----------
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+
+ Returns
+ -------
+ torch.Tensor
+ Flattened responses.
+ """
+ if time.shape[-1] != len(self.time_grid):
+ raise ValueError("Time grid does not match the expected length.")
+ if time.shape != data.shape:
+ raise ValueError("Mismatched time and data shapes are not supported.")
+ return data
+
+ def unflatten_torch(self, data: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
+ """Unflatten a vector containing a response (with gradients).
+
+ Parameters
+ ----------
+ data : torch.Tensor
+ Flattened responses.
+
+ Returns
+ -------
+ Tuple[torch.Tensor, torch.Tensor]
+ List of responses.
+ """
+ return torch.from_numpy(self.time_grid).expand_as(data), data
+
+
+class EndpointFlattenUtility(FlattenUtility):
+ """Response flattening utility based on the time endpoints of the response."""
+
+ def __init__(self, n_points: int):
+ self.n_points = n_points
+
+ def length(self) -> int:
+ """Return the length of the flattened vector.
+
+ Returns
+ -------
+ int
+ Length of the flattened vector.
+ """
+ return self.n_points + 2
+
+ def flatten(self, time: np.ndarray, data: np.ndarray) -> np.ndarray:
+ """Flatten a response into a single vector.
+
+ Parameters
+ ----------
+ time : np.ndarray
+ Time points of the response.
+ data : np.ndarray
+ Data points of the response.
+
+ Returns
+ -------
+ np.ndarray
+ Flattened responses.
+ """
+ # Sanitise input shape
+ if time.shape != data.shape:
+ raise ValueError("Mismatched time and data shapes are not supported.")
+ bounds = np.array([np.min(time), np.max(time)])
+ grid = np.linspace(bounds[0], bounds[1], self.n_points)
+ response = np.interp(grid, time, data)
+ return np.concatenate([response, bounds], axis=-1)
+
+ def flatten_torch(self, time: torch.Tensor, data: torch.Tensor) -> torch.Tensor:
+ """Flatten a response into a single vector (with gradients).
+
+ Parameters
+ ----------
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+
+ Returns
+ -------
+ torch.Tensor
+ Flattened responses.
+ """
+ raise NotImplementedError("Flattening with gradients is not yet supported.")
+
+ def unflatten_torch(self, data: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
+ """Unflatten a vector containing a response (with gradients).
+
+ Parameters
+ ----------
+ data : torch.Tensor
+ Flattened responses.
+
+ Returns
+ -------
+ Tuple[torch.Tensor, torch.Tensor]
+ List of responses.
+ """
+ data_gridless = data[..., :-2]
+ lbounds = data[..., -2].unsqueeze(-1).expand_as(data_gridless)
+ ubounds = data[..., -1].unsqueeze(-1).expand_as(data_gridless)
+ reg_grid = torch.linspace(0, 1, self.n_points).expand_as(data_gridless)
+ grid = lbounds + reg_grid * (ubounds - lbounds)
+ return grid, data_gridless
+
+
+class ConcatUtility:
+ """Utility for concatenating a set of responses (with gradients)."""
+
+ def __init__(self, lengths: List[int]):
+ self.lengths = lengths
+ self.indices = np.cumsum([0] + lengths)
+
+ def concat_torch(self, responses: List[torch.Tensor]) -> torch.Tensor:
+ """Concatenate a list of responses (with gradients).
+
+ Parameters
+ ----------
+ responses : List[torch.Tensor]
+ List of responses.
+
+ Returns
+ -------
+ torch.Tensor
+ Flattened responses.
+ """
+ return torch.cat(responses, dim=-1)
+
+ def split_torch(self, data: torch.Tensor) -> List[torch.Tensor]:
+ """Split a vector containing a set of responses with gradients.
+
+ Parameters
+ ----------
+ data : torch.Tensor
+ Flattened data.
+
+ Returns
+ -------
+ List[torch.Tensor]
+ List of split responses (with gradients).
+ """
+ return [data[..., self.indices[i]:self.indices[i+1]] for i in range(len(self.lengths))]
+
+ def concat(self, responses: List[np.ndarray]) -> np.ndarray:
+ """Concatenate a list of responses.
+
+ Parameters
+ ----------
+ responses : List[np.ndarray]
+ List of responses.
+
+ Returns
+ -------
+ np.ndarray
+ Flattened responses.
+ """
+ return self.concat_torch([torch.from_numpy(res) for res in responses]).numpy(force=True)
+
+ def split(self, data: np.ndarray) -> List[np.ndarray]:
+ """Split a vector containing a set of responses.
+
+ Parameters
+ ----------
+ data : np.ndarray
+ Flattened data.
+
+ Returns
+ -------
+ List[np.ndarray]
+ List of split responses.
+ """
+ return [res.numpy(force=True) for res in self.split_torch(torch.from_numpy(data))]
+
+
+class ResponseComposition(Composition):
+ """Composition for transforming responses."""
+
+ def __init__(
+ self,
+ scalarise: bool,
+ stochastic: bool,
+ weights: List[float],
+ reductions: List[Reduction],
+ flatten_list: List[FlattenUtility],
+ ) -> None:
+ if len(flatten_list) != len(reductions):
+ raise ValueError("Mismatched number of reductions and responses.")
+ self.scalarise = scalarise
+ self.stochastic = stochastic
+ self.weights = weights
+ self.reductions = reductions
+ self.flatten_list = flatten_list
+ self.lenghts = [flatten.length() for flatten in self.flatten_list]
+ self.concat = ConcatUtility(self.lenghts)
+
+ def transform(self, params: np.ndarray, responses: List[List[OutputResult]]) -> ObjectiveResult:
+ """Transform a set of responses into a fixed-size ObjectiveResult for the optimiser.
+
+ Parameters
+ ----------
+ params : np.ndarray
+ Parameters for the given responses.
+ responses : List[List[OutputResult]]
+ List of responses.
+
+ Returns
+ -------
+ ObjectiveResult
+ Transformed responses.
+ """
+ # Sanitise input shape
+ if len(responses) != len(self.flatten_list):
+ raise ValueError("Mismatched number of objectives.")
+ # Build set of responses to concatenate
+ means = []
+ variances = []
+ for i, flatten in enumerate(self.flatten_list):
+ flat_responses = np.array([
+ flatten.flatten(response.get_time(), response.get_data())
+ for response in responses[i]
+ ])
+ # Build stochastic model for this set of responses
+ means.append(np.mean(flat_responses, axis=0))
+ variances.append(np.var(flat_responses, axis=0) / flat_responses.shape[0])
+ # Concatenate the transformed responses
+ return ObjectiveResult(
+ params,
+ self.concat.concat(means),
+ self.concat.concat(variances) if self.stochastic else None,
+ )
+
+ def composition(self, inner: np.ndarray, params: np.ndarray) -> np.ndarray:
+ """Compute the outer function of the composition.
+
+ Parameters
+ ----------
+ inner : np.ndarray
+ Return value from the inner function.
+ params : np.ndarray
+ Parameters for the given responses.
+
+ Returns
+ -------
+ np.ndarray
+ Composition result.
+ """
+ return self.composition_torch(
+ torch.from_numpy(inner),
+ torch.from_numpy(params),
+ ).numpy(force=True)
+
+ def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor:
+ """Compute the outer function of the composition with gradients.
+
+ Parameters
+ ----------
+ inner : torch.Tensor
+ Return value from the inner function.
+ params : torch.Tensor
+ Parameters for the given responses.
+
+ Returns
+ -------
+ torch.Tensor
+ Composition result.
+ """
+ # Split the inner responses
+ responses = self.concat.split_torch(inner)
+ # Unflatten each response
+ unflattened = [
+ flatten.unflatten_torch(response)
+ for response, flatten in zip(responses, self.flatten_list)
+ ]
+ # Evaluate and stack the objective for each response
+ objective = torch.stack([
+ reduction.reduce_torch(time, data, params)
+ for (time, data), reduction in zip(unflattened, self.reductions)
+ ], dim=-1)
+ # Apply the weights
+ objective = objective * torch.tensor(self.weights)
+ # If needed, scalarise the objectives
+ return torch.mean(objective, dim=-1) if self.scalarise else objective
diff --git a/piglot/utils/reductions.py b/piglot/utils/reductions.py
new file mode 100644
index 0000000..50836d0
--- /dev/null
+++ b/piglot/utils/reductions.py
@@ -0,0 +1,176 @@
+"""Module for defining reduction functions for responses."""
+from typing import Callable, Dict, Any, Union
+from abc import ABC, abstractmethod
+import numpy as np
+import torch
+from piglot.utils.assorted import read_custom_module
+
+
+class Reduction(ABC):
+ """Abstract class for defining reduction functions."""
+
+ @abstractmethod
+ def reduce_torch(
+ self,
+ time: torch.Tensor,
+ data: torch.Tensor,
+ params: torch.Tensor,
+ ) -> torch.Tensor:
+ """Reduce the input data to a single value (with gradients).
+
+ Parameters
+ ----------
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+ params : torch.Tensor
+ Parameters for the given responses.
+
+ Returns
+ -------
+ torch.Tensor
+ Reduced value of the data.
+ """
+
+ def reduce(self, time: np.ndarray, data: np.ndarray, params: np.ndarray) -> np.ndarray:
+ """Reduce the input data to a single value.
+
+ Parameters
+ ----------
+ time : np.ndarray
+ Time points of the response.
+ data : np.ndarray
+ Data points of the response.
+ params : np.ndarray
+ Parameters for the given responses.
+
+ Returns
+ -------
+ np.ndarray
+ Reduced value of the data.
+ """
+ return self.reduce_torch(
+ torch.from_numpy(time),
+ torch.from_numpy(data),
+ torch.from_numpy(params),
+ ).numpy(force=True)
+
+
+class NegateReduction(Reduction):
+ """Negate the result of another reduction function."""
+
+ def __init__(self, reduction: Reduction) -> None:
+ self.reduction = reduction
+
+ def reduce_torch(
+ self,
+ time: torch.Tensor,
+ data: torch.Tensor,
+ params: torch.Tensor,
+ ) -> torch.Tensor:
+ """Reduce the input data to a single value.
+
+ Parameters
+ ----------
+ time : np.ndarray
+ Time points of the response.
+ data : np.ndarray
+ Data points of the response.
+ params : np.ndarray
+ Parameters for the given responses.
+
+ Returns
+ -------
+ np.ndarray
+ Reduced value of the data.
+ """
+ return -self.reduction.reduce_torch(time, data, params)
+
+
+class SimpleReduction(Reduction):
+ """Reduction function defined from a lambda function (without using the parameters)."""
+
+ def __init__(self, reduction: Callable[[torch.Tensor, torch.Tensor], torch.Tensor]) -> None:
+ self.reduction = reduction
+
+ def reduce_torch(
+ self,
+ time: torch.Tensor,
+ data: torch.Tensor,
+ params: torch.Tensor,
+ ) -> torch.Tensor:
+ """Reduce the input data to a single value.
+
+ Parameters
+ ----------
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+ params : torch.Tensor
+ Parameters for the given responses.
+
+ Returns
+ -------
+ torch.Tensor
+ Reduced value of the data.
+ """
+ return self.reduction(time, data)
+
+
+AVAILABLE_REDUCTIONS: Dict[str, Reduction] = {
+ 'mean': SimpleReduction(lambda time, data: torch.mean(data, dim=-1)),
+ 'max': SimpleReduction(lambda time, data: torch.max(data, dim=-1).values),
+ 'min': SimpleReduction(lambda time, data: torch.min(data, dim=-1).values),
+ 'sum': SimpleReduction(lambda time, data: torch.sum(data, dim=-1)),
+ 'std': SimpleReduction(lambda time, data: torch.std(data, dim=-1)),
+ 'var': SimpleReduction(lambda time, data: torch.var(data, dim=-1)),
+ 'mse': SimpleReduction(lambda time, data: torch.mean(torch.square(data), dim=-1)),
+ 'mae': SimpleReduction(lambda time, data: torch.mean(torch.abs(data), dim=-1)),
+ 'last': SimpleReduction(lambda time, data: data[..., -1]),
+ 'first': SimpleReduction(lambda time, data: data[..., 0]),
+ 'max_abs': SimpleReduction(lambda time, data: torch.max(torch.abs(data), dim=-1).values),
+ 'min_abs': SimpleReduction(lambda time, data: torch.min(torch.abs(data), dim=-1).values),
+ 'integral': SimpleReduction(lambda time, data: torch.trapz(data, time, dim=-1)),
+ 'square_integral': SimpleReduction(
+ lambda time, data: torch.trapz(torch.square(data), time, dim=-1),
+ ),
+ 'abs_integral': SimpleReduction(
+ lambda time, data: torch.trapz(torch.abs(data), time, dim=-1),
+ ),
+}
+# TODO: Add test for non-existing 'script' reduction
+
+
+def read_reduction(config: Union[str, Dict[str, Any]]) -> Reduction:
+ """Read a reduction function from a configuration.
+
+ Parameters
+ ----------
+ config : Union[str, Dict[str, Any]]
+ Configuration of the reduction function.
+
+ Returns
+ -------
+ Reduction
+ Reduction function.
+ """
+ # Parse the reduction in the simple format
+ if isinstance(config, str):
+ name = config
+ if name == 'script':
+ raise ValueError('Need to pass the file path for the "script" reduction.')
+ if name not in AVAILABLE_REDUCTIONS:
+ raise ValueError(f'Reduction function "{name}" is not available.')
+ return AVAILABLE_REDUCTIONS[name]
+ # Detailed format
+ if 'name' not in config:
+ raise ValueError('Need to pass the name of the reduction function.')
+ name = config['name']
+ # Read script reduction
+ if name == 'script':
+ return read_custom_module(config, Reduction)()
+ if name not in AVAILABLE_REDUCTIONS:
+ raise ValueError(f'Reduction function "{name}" is not available.')
+ return AVAILABLE_REDUCTIONS[name]
diff --git a/test/examples/min_script.py b/test/examples/min_script.py
index b26170f..28abf81 100644
--- a/test/examples/min_script.py
+++ b/test/examples/min_script.py
@@ -1,24 +1,30 @@
-from __future__ import annotations
-import numpy as np
-from piglot.solver.solver import OutputResult
-from piglot.objectives.design import Quantity
+import torch
+from piglot.utils.reductions import Reduction
-class MinQuantity(Quantity):
-
+class MinQuantity(Reduction):
"""Minimum value of a response."""
- def compute(self, result: OutputResult) -> float:
- """Get the minimum of a given response.
+ def reduce_torch(
+ self,
+ time: torch.Tensor,
+ data: torch.Tensor,
+ params: torch.Tensor,
+ ) -> torch.Tensor:
+ """Reduce the input data to a single value (with gradients).
Parameters
----------
- result : OutputResult
- Output result to compute the quantity for.
+ time : torch.Tensor
+ Time points of the response.
+ data : torch.Tensor
+ Data points of the response.
+ params : torch.Tensor
+ Parameters for the given responses.
Returns
-------
- float
- Quantity value.
+ torch.Tensor
+ Reduced value of the data.
"""
- return np.min(result.get_data())
+ return torch.min(data, dim=-1).values
diff --git a/test/examples_assertions/mae_reduction.yaml b/test/examples_assertions/mae_reduction.yaml
deleted file mode 100644
index 5314d17..0000000
--- a/test/examples_assertions/mae_reduction.yaml
+++ /dev/null
@@ -1,34 +0,0 @@
-
-iters: 10
-
-optimiser:
- name: botorch
-
-
-parameters:
- a: [0, -4, 4]
-
-
-objective:
- name: fitting
- composite: True
- stochastic: True
- reduction: mae
- solver:
- name: curve
- cases:
- 'case_1':
- expression: * x ** 2
- parametric: x
- bounds: [-5, 5]
- points: 100
- 'case_2':
- expression: 1.1 * * x ** 2
- parametric: x
- bounds: [-5, 5]
- points: 100
-
-
- references:
- 'reference_curve.txt':
- prediction: ['case_1', 'case_2']
diff --git a/test/examples_plots/dummy_composite_stochastic_design.yaml b/test/examples_plots/dummy_composite_stochastic_design.yaml
index d944514..a2187df 100644
--- a/test/examples_plots/dummy_composite_stochastic_design.yaml
+++ b/test/examples_plots/dummy_composite_stochastic_design.yaml
@@ -33,4 +33,5 @@ objective:
quantity: max
prediction: ['case_1', 'case_2']
negate: False
+ n_points: 10
diff --git a/test/test_examples_assertions.py b/test/test_examples_assertions.py
index aaa017a..81d8287 100644
--- a/test/test_examples_assertions.py
+++ b/test/test_examples_assertions.py
@@ -57,8 +57,8 @@
"Missing test function",
),
'synthetic_unknown_composition.yaml': (
- RuntimeError,
- 'Unknown composition none.',
+ ValueError,
+ 'Reduction function "none" is not available.',
),
'missing_iters.yaml': (
RuntimeError,
@@ -112,10 +112,6 @@
ValueError,
"Reference 'reference_curve_2.txt' is not associated to any case.",
),
- 'mae_reduction.yaml': (
- ValueError,
- "Invalid reduction 'mae' for fitting objective.",
- ),
'design_missing_pred.yaml': (
ValueError,
"Missing prediction for design target 'maximum_force'.",
@@ -126,15 +122,15 @@
),
'design_missing_quantity_name.yaml': (
ValueError,
- "Missing name in quantity specification.",
+ "Need to pass the name of the reduction function.",
),
'design_missing_quantity_script.yaml': (
ValueError,
- "Missing script in quantity specification.",
+ "Missing 'script' field for reading the custom module script.",
),
'design_missing_quantity_class.yaml': (
ValueError,
- "Missing class in quantity specification.",
+ "Missing 'class' field for reading the custom module script.",
),
'design_missing_solver.yaml': (
ValueError,