Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: fit_curve and predict_curve #57

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions openeo_processes_dask/process_implementations/ml/curve_fitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from typing import Callable, Optional, Union

import dask
import dask.array as da
import numpy as np
import xarray as xr
from numpy.typing import ArrayLike
from scipy import optimize

from openeo_processes_dask.exceptions import (
DimensionNotAvailable,
OpenEOException,
QuantilesParameterConflict,
QuantilesParameterMissing,
)
from openeo_processes_dask.process_implementations.cubes.utils import (
_has_dask,
_is_dask_array,
)
from openeo_processes_dask.process_implementations.data_model import RasterCube

__all__ = ["fit_curve", "predict_curve"]


def fit_curve(
data: RasterCube,
parameters: Union[RasterCube, ArrayLike],
function: Callable,
dimension: str,
):
data = data.fillna(0) # zero values (masked) are not considered
if dimension in ["time", "t", "times"]:
dates = data[dimension].values
timestep = [
((x - np.datetime64("1970-01-01")) / np.timedelta64(1, "s")) for x in dates
]
step = np.array(timestep)
data[dimension] = step
else:
step = dimension

if isinstance(parameters, RasterCube):
apply_f = lambda x, y, p: optimize.curve_fit(
function, x[np.nonzero(y)], y[np.nonzero(y)], p
)[0]
in_dims = [[dimension], [dimension], ["params"]]
add_arg = [step, data, parameters]
output_size = len(parameters["params"])
else:
apply_f = lambda x, y: optimize.curve_fit(
function, x[np.nonzero(y)], y[np.nonzero(y)], parameters
)[0]
in_dims = [[dimension], [dimension]]
add_arg = [step, data]
output_size = len(parameters)
values = xr.apply_ufunc(
apply_f,
*add_arg,
vectorize=True,
input_core_dims=in_dims,
output_core_dims=[["params"]],
dask="parallelized",
output_dtypes=[np.float32],
dask_gufunc_kwargs={
"allow_rechunk": True,
"output_sizes": {"params": output_size},
}
)
values["params"] = list(range(len(values["params"])))
values.attrs = data.attrs
return values


def predict_curve(
data: RasterCube,
parameters: Union[RasterCube, ArrayLike],
function: Callable,
dimension: str,
labels: Optional[ArrayLike] = None,
):
data = data.fillna(0)
if (np.array([labels])).shape[-1] > 1:
test = [labels]
else:
test = labels
if dimension in [
"time",
"t",
"times",
]: # time dimension must be converted into values
dates = data[dimension].values
if test is None:
timestep = [
(
(np.datetime64(x) - np.datetime64("1970-01-01"))
/ np.timedelta64(1, "s")
)
for x in dates
]
labels = np.array(timestep)
else:
coords = labels
labels = [
(
(np.datetime64(x) - np.datetime64("1970-01-01"))
/ np.timedelta64(1, "s")
)
for x in labels
]
labels = np.array(labels)
else:
if test is None:
labels = data[dimension].values
else:
coords = labels
values = xr.apply_ufunc(
lambda a: function(labels, *a),
parameters,
vectorize=True,
input_core_dims=[["params"]],
output_core_dims=[[dimension]],
dask="parallelized",
output_dtypes=[np.float32],
dask_gufunc_kwargs={
"allow_rechunk": True,
"output_sizes": {dimension: len(labels)},
},
)
if test is None:
values = values.transpose(*data.dims)
values[dimension] = data[dimension]
predicted = data.where(data != 0, values)
else:
predicted = values.transpose(*data.dims)
predicted[dimension] = coords
predicted.attrs = data.attrs
return predicted
56 changes: 56 additions & 0 deletions tests/test_curve_fitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from functools import partial

import numpy as np
import pytest
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.process_implementations.ml.curve_fitting import (
fit_curve,
predict_curve,
)
from tests.general_checks import general_output_checks
from tests.mockdata import create_fake_rastercube


@pytest.mark.parametrize("size", [(30, 30, 20, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_curve(temporal_interval, bounding_box, random_raster_data, process_registry):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

_process = partial(
process_registry["multiply"],
x=ParameterReference(from_parameter="data"),
y=ParameterReference(from_parameter="parameters"),
)

fitted_cube = fit_curve(
data=input_cube, parameters=[1], function=_process, dimension="t"
)

general_output_checks(
input_cube=input_cube,
output_cube=fitted_cube,
verify_attrs=False,
verify_crs=True,
)
assert fitted_cube.dims == ("x", "y", "bands", "params")
assert fitted_cube["params"].values == 0

predicted_cube = predict_curve(
data=input_cube, parameters=fitted_cube, function=_process, dimension="t"
)
general_output_checks(
input_cube=input_cube,
output_cube=predicted_cube,
verify_attrs=False,
verify_crs=True,
)
assert predicted_cube.dims == ("x", "y", "t", "bands")
assert (predicted_cube["t"].values == input_cube["t"].values).all()