diff --git a/openeo_processes_dask/process_implementations/ml/__init__.py b/openeo_processes_dask/process_implementations/ml/__init__.py index 093eb051..9ce3ce0e 100644 --- a/openeo_processes_dask/process_implementations/ml/__init__.py +++ b/openeo_processes_dask/process_implementations/ml/__init__.py @@ -1 +1,2 @@ +from .curve_fitting import * from .random_forest import * diff --git a/openeo_processes_dask/process_implementations/ml/curve_fitting.py b/openeo_processes_dask/process_implementations/ml/curve_fitting.py new file mode 100644 index 00000000..425ce030 --- /dev/null +++ b/openeo_processes_dask/process_implementations/ml/curve_fitting.py @@ -0,0 +1,125 @@ +from typing import Callable, Optional + +import numpy as np +import pandas as pd +import xarray as xr +from numpy.typing import ArrayLike + +from openeo_processes_dask.process_implementations.cubes import apply_dimension +from openeo_processes_dask.process_implementations.data_model import RasterCube +from openeo_processes_dask.process_implementations.exceptions import ( + DimensionNotAvailable, +) + +__all__ = ["fit_curve", "predict_curve"] + + +def fit_curve( + data: RasterCube, + parameters: list, + function: Callable, + dimension: str, + ignore_nodata: bool = True, +): + if dimension not in data.dims: + raise DimensionNotAvailable( + f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" + ) + + dims_before = list(data.dims) + + # In the spec, parameters is a list, but xr.curvefit requires names for them, + # so we do this to generate names locally + parameters = {f"param_{i}": v for i, v in enumerate(parameters)} + + # The dimension along which to fit the curves cannot be chunked! + rechunked_data = data.chunk({dimension: -1}) + + def wrapper(f): + def _wrap(*args, **kwargs): + return f( + *args, + **kwargs, + positional_parameters={"x": 0, "parameters": slice(1, None)}, + ) + + return _wrap + + expected_dims_after = list(dims_before) + expected_dims_after[dims_before.index(dimension)] = "param" + + # .curvefit returns some extra information that isn't required by the OpenEO process + # so we simply drop these here. + fit_result = ( + rechunked_data.curvefit( + dimension, + wrapper(function), + p0=parameters, + param_names=list(parameters.keys()), + skipna=ignore_nodata, + ) + .drop_dims(["cov_i", "cov_j"]) + .to_array() + .squeeze() + .transpose(*expected_dims_after) + ) + + fit_result.attrs = data.attrs + + return fit_result + + +def predict_curve( + parameters: RasterCube, + function: Callable, + dimension: str, + labels: ArrayLike, +): + labels_were_datetime = False + dims_before = list(parameters.dims) + + try: + # Try parsing as datetime first + labels = np.asarray(labels, dtype=np.datetime64) + except ValueError: + labels = np.asarray(labels) + + if np.issubdtype(labels.dtype, np.datetime64): + labels = labels.astype(int) + labels_were_datetime = True + + # This is necessary to pipe the arguments correctly through @process + def wrapper(f): + def _wrap(*args, **kwargs): + return f( + *args, + positional_parameters={"parameters": 0}, + named_parameters={"x": labels}, + **kwargs, + ) + + return _wrap + + expected_dims_after = list(dims_before) + expected_dims_after[dims_before.index("param")] = dimension + + predictions = xr.apply_ufunc( + wrapper(function), + parameters, + vectorize=True, + input_core_dims=[["param"]], + output_core_dims=[[dimension]], + dask="parallelized", + output_dtypes=[np.float64], + dask_gufunc_kwargs={ + "allow_rechunk": True, + "output_sizes": {dimension: len(labels)}, + }, + ).transpose(*expected_dims_after) + + predictions = predictions.assign_coords({dimension: labels.data}) + + if labels_were_datetime: + predictions[dimension] = pd.DatetimeIndex(predictions[dimension].values) + + return predictions diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 51cdcd8b..ad4be393 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 51cdcd8b8bb79cf5caf89222b9c2ac8a4effb89b +Subproject commit ad4be3937b799a42c291fa3ad1d87572f4bcd026 diff --git a/tests/test_ml.py b/tests/test_ml.py index 58e90ca2..1325bace 100644 --- a/tests/test_ml.py +++ b/tests/test_ml.py @@ -1,7 +1,21 @@ +from functools import partial + +import dask import geopandas as gpd +import numpy as np +import pytest import xgboost as xgb +from openeo_pg_parser_networkx.pg_schema import ParameterReference -from openeo_processes_dask.process_implementations.ml import fit_regr_random_forest +from openeo_processes_dask.process_implementations.core import process +from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension +from openeo_processes_dask.process_implementations.cubes.general import dimension_labels +from openeo_processes_dask.process_implementations.ml import ( + fit_curve, + fit_regr_random_forest, + predict_curve, +) +from tests.mockdata import create_fake_rastercube def test_fit_regr_random_forest(vector_data_cube, dask_client): @@ -32,3 +46,49 @@ def test_fit_regr_random_forest_inline_geojson( ) assert isinstance(model, xgb.core.Booster) + + +@pytest.mark.parametrize("size", [(6, 5, 4, 3)]) +@pytest.mark.parametrize("dtype", [np.float64]) +def test_curve_fitting(temporal_interval, bounding_box, random_raster_data): + origin_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04"], + backend="dask", + ) + + @process + def fitFunction(x, parameters): + t0 = 2 * np.pi / 31557600 * x + return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0) + + _process = partial( + fitFunction, + x=ParameterReference(from_parameter="x"), + parameters=ParameterReference(from_parameter="parameters"), + ) + + parameters = [1, 0, 0] + result = fit_curve( + origin_cube, parameters=parameters, function=_process, dimension="t" + ) + assert len(result.param) == 3 + assert isinstance(result.data, dask.array.Array) + + assert len(result.coords["bands"]) == len(origin_cube.coords["bands"]) + assert len(result.coords["x"]) == len(origin_cube.coords["x"]) + assert len(result.coords["y"]) == len(origin_cube.coords["y"]) + assert len(result.coords["param"]) == len(parameters) + + labels = dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0]) + predictions = predict_curve( + result, + _process, + origin_cube.openeo.temporal_dims[0], + labels=labels, + ).compute() + + assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels) + assert "param" not in predictions.dims