Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fit_curve and predict_curve #139

Merged
merged 23 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .curve_fitting import *
from .random_forest import *
110 changes: 110 additions & 0 deletions openeo_processes_dask/process_implementations/ml/curve_fitting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import Callable, Optional

import numpy as np
import pandas as pd
import xarray as xr
from numpy.typing import ArrayLike

from openeo_processes_dask.process_implementations.cubes import apply_dimension
from openeo_processes_dask.process_implementations.data_model import RasterCube
from openeo_processes_dask.process_implementations.exceptions import (
DimensionNotAvailable,
)

__all__ = ["fit_curve", "predict_curve"]


def fit_curve(data: RasterCube, parameters: list, function: Callable, dimension: str):
if dimension not in data.dims:
raise DimensionNotAvailable(

Check warning on line 19 in openeo_processes_dask/process_implementations/ml/curve_fitting.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/ml/curve_fitting.py#L19

Added line #L19 was not covered by tests
f"Provided dimension ({dimension}) not found in data.dims: {data.dims}"
)

dims_before = list(data.dims)

# In the spec, parameters is a list, but xr.curvefit requires names for them,
# so we do this to generate names locally
parameters = {f"param_{i}": v for i, v in enumerate(parameters)}

# The dimension along which to predict cannot be chunked!
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved
rechunked_data = data.chunk({dimension: -1})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do the same in predict_curve as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters for predict, because there each timestep can be inferenced for independently of the other steps!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LukeWeidenwalker that's true. But I was wondering if it would be faster when predicting on a datacube with many timesteps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - not sure tbh, I haven't profiled predict_curve at all yet - I think I'll merge and deploy this now so we can start a training run at least, and revisit this if performance of inference turns out to be a problem!


def wrapper(f):
def _wrap(*args, **kwargs):
return f(
*args,
**kwargs,
positional_parameters={"x": 0, "parameters": slice(1, None)},
)

return _wrap

expected_dims_after = list(dims_before)
expected_dims_after[dims_before.index(dimension)] = "param"

# .curvefit returns some extra information that isn't required by the OpenEO process
# so we simply drop these here.
fit_result = (
rechunked_data.curvefit(
dimension,
wrapper(function),
p0=parameters,
param_names=list(parameters.keys()),
)
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved
.drop_dims(["cov_i", "cov_j"])
.to_array()
.squeeze()
.transpose(*expected_dims_after)
)
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved

return fit_result
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved


def predict_curve(
parameters: RasterCube,
function: Callable,
dimension: str,
labels: Optional[ArrayLike] = None,
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved
):
labels_were_datetime = False
dims_before = list(parameters.dims)

if np.issubdtype(labels.dtype, np.datetime64):
LukeWeidenwalker marked this conversation as resolved.
Show resolved Hide resolved
labels = labels.astype(int)
labels_were_datetime = True

# This is necessary to pipe the arguments correctly through @process
def wrapper(f):
def _wrap(*args, **kwargs):
return f(
*args,
positional_parameters={"parameters": 0},
named_parameters={"x": labels},
**kwargs,
)

return _wrap

expected_dims_after = list(dims_before)
expected_dims_after[dims_before.index("param")] = dimension

predictions = xr.apply_ufunc(
wrapper(function),
parameters,
vectorize=True,
input_core_dims=[["param"]],
output_core_dims=[[dimension]],
dask="parallelized",
output_dtypes=[np.float64],
dask_gufunc_kwargs={
"allow_rechunk": True,
"output_sizes": {dimension: len(labels)},
},
).transpose(*expected_dims_after)

predictions = predictions.assign_coords({dimension: labels.data})

if labels_were_datetime:
predictions[dimension] = pd.DatetimeIndex(predictions[dimension].values)

return predictions
62 changes: 61 additions & 1 deletion tests/test_ml.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
from functools import partial

import dask
import geopandas as gpd
import numpy as np
import pytest
import xgboost as xgb
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.process_implementations.ml import fit_regr_random_forest
from openeo_processes_dask.process_implementations.core import process
from openeo_processes_dask.process_implementations.cubes.apply import apply_dimension
from openeo_processes_dask.process_implementations.cubes.general import dimension_labels
from openeo_processes_dask.process_implementations.ml import (
fit_curve,
fit_regr_random_forest,
predict_curve,
)
from tests.mockdata import create_fake_rastercube


def test_fit_regr_random_forest(vector_data_cube, dask_client):
Expand Down Expand Up @@ -32,3 +46,49 @@ def test_fit_regr_random_forest_inline_geojson(
)

assert isinstance(model, xgb.core.Booster)


@pytest.mark.parametrize("size", [(6, 5, 4, 3)])
@pytest.mark.parametrize("dtype", [np.float64])
def test_curve_fitting(temporal_interval, bounding_box, random_raster_data):
origin_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04"],
backend="dask",
)

@process
def fitFunction(x, parameters):
t0 = 2 * np.pi / 31557600 * x
return parameters[0] + parameters[1] * np.cos(t0) + parameters[2] * np.sin(t0)

_process = partial(
fitFunction,
x=ParameterReference(from_parameter="x"),
parameters=ParameterReference(from_parameter="parameters"),
)

parameters = [1, 0, 0]
result = fit_curve(
origin_cube, parameters=parameters, function=_process, dimension="t"
)
assert len(result.param) == 3
assert isinstance(result.data, dask.array.Array)

assert len(result.coords["bands"]) == len(origin_cube.coords["bands"])
assert len(result.coords["x"]) == len(origin_cube.coords["x"])
assert len(result.coords["y"]) == len(origin_cube.coords["y"])
assert len(result.coords["param"]) == len(parameters)

labels = dimension_labels(origin_cube, origin_cube.openeo.temporal_dims[0])
predictions = predict_curve(
result,
_process,
origin_cube.openeo.temporal_dims[0],
labels=labels,
).compute()

assert len(predictions.coords[origin_cube.openeo.temporal_dims[0]]) == len(labels)
assert "param" not in predictions.dims