From 8e82b6152f84b4eaefe105c2d50741ddaa623616 Mon Sep 17 00:00:00 2001 From: ValentinaHutter Date: Tue, 31 Jan 2023 14:09:32 +0100 Subject: [PATCH] add fit_curve, predict_curve and basic tests --- .../ml/curve_fitting.py | 137 ++++++++++++++++++ tests/test_curve_fitting.py | 56 +++++++ 2 files changed, 193 insertions(+) create mode 100644 openeo_processes_dask/process_implementations/ml/curve_fitting.py create mode 100644 tests/test_curve_fitting.py diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py new file mode 100644 index 00000000..ebebdf40 --- /dev/null +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -0,0 +1,137 @@ +from typing import Callable, Optional, Union + +import dask +import dask.array as da +import numpy as np +import xarray as xr +from numpy.typing import ArrayLike +from scipy import optimize + +from openeo_processes_dask.exceptions import ( + DimensionNotAvailable, + OpenEOException, + QuantilesParameterConflict, + QuantilesParameterMissing, +) +from openeo_processes_dask.process_implementations.cubes.utils import ( + _has_dask, + _is_dask_array, +) +from openeo_processes_dask.process_implementations.data_model import RasterCube + +__all__ = ["fit_curve", "predict_curve"] + + +def fit_curve( + data: RasterCube, + parameters: Union[RasterCube, ArrayLike], + function: Callable, + dimension: str, +): + data = data.fillna(0) # zero values (masked) are not considered + if dimension in ["time", "t", "times"]: + dates = data[dimension].values + timestep = [ + ((x - np.datetime64("1970-01-01")) / np.timedelta64(1, "s")) for x in dates + ] + step = np.array(timestep) + data[dimension] = step + else: + step = dimension + + if isinstance(parameters, RasterCube): + apply_f = lambda x, y, p: optimize.curve_fit( + function, x[np.nonzero(y)], y[np.nonzero(y)], p + )[0] + in_dims = [[dimension], [dimension], ["params"]] + add_arg = [step, data, parameters] + output_size = len(parameters["params"]) + else: + apply_f = lambda x, y: optimize.curve_fit( + function, x[np.nonzero(y)], y[np.nonzero(y)], parameters + )[0] + in_dims = [[dimension], [dimension]] + add_arg = [step, data] + output_size = len(parameters) + values = xr.apply_ufunc( + apply_f, + *add_arg, + vectorize=True, + input_core_dims=in_dims, + output_core_dims=[["params"]], + dask="parallelized", + output_dtypes=[np.float32], + dask_gufunc_kwargs={ + "allow_rechunk": True, + "output_sizes": {"params": output_size}, + } + ) + values["params"] = list(range(len(values["params"]))) + values.attrs = data.attrs + return values + + +def predict_curve( + data: RasterCube, + parameters: Union[RasterCube, ArrayLike], + function: Callable, + dimension: str, + labels: Optional[ArrayLike] = None, +): + data = data.fillna(0) + if (np.array([labels])).shape[-1] > 1: + test = [labels] + else: + test = labels + if dimension in [ + "time", + "t", + "times", + ]: # time dimension must be converted into values + dates = data[dimension].values + if test is None: + timestep = [ + ( + (np.datetime64(x) - np.datetime64("1970-01-01")) + / np.timedelta64(1, "s") + ) + for x in dates + ] + labels = np.array(timestep) + else: + coords = labels + labels = [ + ( + (np.datetime64(x) - np.datetime64("1970-01-01")) + / np.timedelta64(1, "s") + ) + for x in labels + ] + labels = np.array(labels) + else: + if test is None: + labels = data[dimension].values + else: + coords = labels + values = xr.apply_ufunc( + lambda a: function(labels, *a), + parameters, + vectorize=True, + input_core_dims=[["params"]], + output_core_dims=[[dimension]], + dask="parallelized", + output_dtypes=[np.float32], + dask_gufunc_kwargs={ + "allow_rechunk": True, + "output_sizes": {dimension: len(labels)}, + }, + ) + if test is None: + values = values.transpose(*data.dims) + values[dimension] = data[dimension] + predicted = data.where(data != 0, values) + else: + predicted = values.transpose(*data.dims) + predicted[dimension] = coords + predicted.attrs = data.attrs + return predicted diff --git a/tests/test_curve_fitting.py b/tests/test_curve_fitting.py new file mode 100644 index 00000000..497b642a --- /dev/null +++ b/tests/test_curve_fitting.py @@ -0,0 +1,56 @@ +from functools import partial + +import numpy as np +import pytest +import xarray as xr +from openeo_pg_parser_networkx.pg_schema import ParameterReference + +from openeo_processes_dask.process_implementations.ml.curve_fitting import ( + fit_curve, + predict_curve, +) +from tests.general_checks import general_output_checks +from tests.mockdata import create_fake_rastercube + + +@pytest.mark.parametrize("size", [(30, 30, 20, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_curve(temporal_interval, bounding_box, random_raster_data, process_registry): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + _process = partial( + process_registry["multiply"], + x=ParameterReference(from_parameter="data"), + y=ParameterReference(from_parameter="parameters"), + ) + + fitted_cube = fit_curve( + data=input_cube, parameters=[1], function=_process, dimension="t" + ) + + general_output_checks( + input_cube=input_cube, + output_cube=fitted_cube, + verify_attrs=False, + verify_crs=True, + ) + assert fitted_cube.dims == ("x", "y", "bands", "params") + assert fitted_cube["params"].values == 0 + + predicted_cube = predict_curve( + data=input_cube, parameters=fitted_cube, function=_process, dimension="t" + ) + general_output_checks( + input_cube=input_cube, + output_cube=predicted_cube, + verify_attrs=False, + verify_crs=True, + ) + assert predicted_cube.dims == ("x", "y", "t", "bands") + assert (predicted_cube["t"].values == input_cube["t"].values).all()