Skip to content

Commit

Permalink
Setup test fixtures and add example test for `aggregate_temporal_peri…
Browse files Browse the repository at this point in the history
…od` (#3)

* add test for aggregate temporal

* remove poetry-lock hook

* Update test suite with more flexible fixtures

* Add extensive test for aggregate_temporal_period

* fix typing in aggregate_temporal_period

* remove obsolete alias

* add test for apply

* Add test for reduce_dimension
  • Loading branch information
LukeWeidenwalker authored Nov 30, 2022
1 parent 166d49d commit 9f7a958
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 32 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ repos:
rev: 1.2.2
hooks:
- id: poetry-check
- id: poetry-lock

- repo: https://github.com/pycqa/isort
rev: 5.10.1
Expand Down
2 changes: 1 addition & 1 deletion openeo_processes_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def wrapper(*args, parameters: Optional[dict[str]] = None, **kwargs):
}

# Add aliases
aliases = {"read_vector": "load_vector_cube"}
aliases = {}

for alias, process_name in aliases.items():
process_registry[alias] = process_registry[process_name]
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def aggregate_temporal_period(
if period in periods_to_frequency.keys():
frequency = periods_to_frequency[period]

data = data.resample(t=frequency)
return reducer(data=data, **kwargs)
resampled_data = data.resample(t=frequency)
return reducer(data=resampled_data, **kwargs)


def aggregate_spatial(
Expand Down
28 changes: 24 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
import logging

import numpy as np
import pytest

from tests.mockdata import generate_fake_rastercube
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval

logger = logging.getLogger(__name__)


@pytest.fixture
def mock_rastercube_factory():
return generate_fake_rastercube
def random_data(size, dtype, seed=42):
rng = np.random.default_rng(seed)
data = rng.integers(-100, 100, size=size)
data = data.astype(dtype)
return data


@pytest.fixture
def bounding_box(west=10.45, east=10.5, south=46.1, north=46.2, crs="EPSG:4326"):
spatial_extent = {
"west": west,
"east": east,
"south": south,
"north": north,
"crs": crs,
}
return BoundingBox.parse_obj(spatial_extent)


@pytest.fixture
def temporal_interval(interval=["2018-05-01", "2018-06-01"]):
return TemporalInterval.parse_obj(interval)
44 changes: 44 additions & 0 deletions tests/general_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Checks here are inspired by makepath/xarray-spatial/tests/general_checks.py

import dask.array as da
import numpy as np

from openeo_processes_dask.process_implementations.data_model import RasterCube


def general_output_checks(
input_cube: RasterCube,
output_cube: RasterCube,
expected_results=None,
verify_crs: bool = False,
verify_attrs: bool = False,
rtol=1e-06,
):
assert isinstance(output_cube.data, type(input_cube.data))

if verify_crs:
assert input_cube.rio.crs == output_cube.rio.crs

if verify_attrs:
assert input_cube.attrs == output_cube.attrs

if expected_results is not None:
if isinstance(output_cube.data, np.ndarray):
output_data = output_cube.data
elif isinstance(output_cube.data, da.Array):
output_data = output_cube.data.compute()
else:
raise TypeError(f"Unsupported array type: {type(output_cube.data)}")

np.testing.assert_allclose(
output_data, expected_results, equal_nan=True, rtol=rtol
)


def assert_numpy_equals_dask_numpy(numpy_cube, dask_cube, func):
numpy_result = func(numpy_cube)
dask_result = func(dask_cube)
general_output_checks(dask_cube, dask_result)
np.testing.assert_allclose(
numpy_result.data, dask_result.data.compute(), equal_nan=True
)
44 changes: 20 additions & 24 deletions tests/mockdata.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import logging

import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval

logger = logging.getLogger(__name__)
rng = np.random.default_rng()


def generate_fake_rastercube(
seed,
def create_fake_rastercube(
data,
spatial_extent: BoundingBox,
temporal_extent: TemporalInterval,
bands: list,
n_spatial_coords: int,
n_timesteps: int,
backend="numpy",
chunks=("auto", "auto", "auto", -1),
):

# Calculate the desired resolution based on how many samples we desire on the longest axis.
Expand All @@ -27,36 +25,34 @@ def generate_fake_rastercube(
spatial_extent.south, spatial_extent.north
)

step_size = max(len_x, len_y) / n_spatial_coords

x = np.arange(
x_coords = np.arange(
min(spatial_extent.west, spatial_extent.east),
max(spatial_extent.west, spatial_extent.east),
step=step_size,
step=len_x / data.shape[0],
)
y = np.arange(
y_coords = np.arange(
min(spatial_extent.south, spatial_extent.north),
max(spatial_extent.south, spatial_extent.north),
step=step_size,
step=len_y / data.shape[1],
)
t = pd.date_range(
t_coords = pd.date_range(
start=np.datetime64(temporal_extent.__root__[0].__root__),
end=np.datetime64(temporal_extent.__root__[1].__root__),
periods=n_timesteps,
periods=data.shape[2],
).values

coords = {"x": x, "y": y, "t": t, "bands": bands}

# This is to enable simulating fake data from different collections.
# The [:9] part is necessary because Dask.random.seed can only accept 32-bit values
da.random.seed(int(str(abs(hash(seed)))[:9]))
_data = da.random.random(tuple([len(v) for _, v in coords.items()]))
coords = {"x": x_coords, "y": y_coords, "t": t_coords, "bands": bands}

data = xr.DataArray(
data=_data,
raster_cube = xr.DataArray(
data=data,
coords=coords,
attrs={"crs": spatial_extent.crs},
)
data.rio.write_crs(spatial_extent.crs, inplace=True)
raster_cube.rio.write_crs(spatial_extent.crs, inplace=True)

if "dask" in backend:
import dask.array as da

raster_cube.data = da.from_array(raster_cube.data, chunks=chunks)

return data.chunk("auto", "auto", "auto", -1)
return raster_cube
76 changes: 76 additions & 0 deletions tests/test_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from functools import partial

import numpy as np
import pytest
from openeo_pg_parser_networkx.pg_schema import TemporalInterval

from openeo_processes_dask.process_implementations.cubes.aggregate import (
aggregate_temporal_period,
)
from openeo_processes_dask.process_implementations.math import mean
from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks
from tests.mockdata import create_fake_rastercube


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float64])
@pytest.mark.parametrize(
"temporal_extent,period,expected",
[
(["2018-05-01", "2018-05-02"], "hour", 25),
(["2018-05-01", "2018-06-01"], "day", 32),
(["2018-05-01", "2018-06-01"], "week", 5),
(["2018-05-01", "2018-06-01"], "month", 2),
(["2018-01-01", "2018-12-31"], "season", 5),
(["2018-01-01", "2018-12-31"], "year", 1),
],
)
def test_aggregate_temporal_period(
temporal_extent, period, expected, bounding_box, random_data
):
""""""
input_cube = create_fake_rastercube(
data=random_data,
spatial_extent=bounding_box,
temporal_extent=TemporalInterval.parse_obj(temporal_extent),
bands=["B02", "B03", "B04", "B08"],
)
output_cube = aggregate_temporal_period(
data=input_cube, period=period, reducer=mean
)

general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=True,
verify_crs=True,
)

assert len(output_cube.t) == expected
assert isinstance(output_cube.t.values[0], np.datetime64)


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.int32, np.int64, np.float32, np.float64])
def test_aggregate_temporal_period_numpy_equals_dask(
random_data, bounding_box, temporal_interval
):
numpy_cube = create_fake_rastercube(
data=random_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="numpy",
)
dask_cube = create_fake_rastercube(
data=random_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

func = partial(aggregate_temporal_period, reducer=mean, period="hour")
assert_numpy_equals_dask_numpy(
numpy_cube=numpy_cube, dask_cube=dask_cube, func=func
)
35 changes: 35 additions & 0 deletions tests/test_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from functools import partial

import numpy as np
import pytest
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.core import process_registry
from openeo_processes_dask.process_implementations.cubes.apply import apply
from openeo_processes_dask.process_implementations.math import add
from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks
from tests.mockdata import create_fake_rastercube


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply(temporal_interval, bounding_box, random_data):
input_cube = create_fake_rastercube(
data=random_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
)

_process = partial(
process_registry["add"], y=1, x=ParameterReference(from_parameter="x")
)

output_cube = apply(data=input_cube, process=_process)

general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=True,
verify_crs=True,
)
34 changes: 34 additions & 0 deletions tests/test_reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from functools import partial

import numpy as np
import pytest
from openeo_pg_parser_networkx.pg_schema import ParameterReference

from openeo_processes_dask.core import process_registry
from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension
from tests.general_checks import general_output_checks
from tests.mockdata import create_fake_rastercube


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_reduce_dimension(temporal_interval, bounding_box, random_data):
input_cube = create_fake_rastercube(
data=random_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
)

_process = partial(
process_registry["mean"], y=1, data=ParameterReference(from_parameter="data")
)

output_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="t")

general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=False,
verify_crs=True,
)

0 comments on commit 9f7a958

Please sign in to comment.