From 665a877f1e478a433395efce82dd25d64c5c17f7 Mon Sep 17 00:00:00 2001 From: Mohammad Alasawedah <61426508+masawdah@users.noreply.github.com> Date: Wed, 22 Nov 2023 15:09:38 +0100 Subject: [PATCH 01/48] Aggregate spatial (#194) * spatial filter * spatial filter * improvement * name dimesnions * remove compute * handle CRS * support feature collection * clean up * updated filter * pytest filter_spatial * reformatted files * pre-check * pre-check * remove parcels * mask_polygon * spatial_aggregation * spatial_aggregation * spatial_aggregate * update * update * spatial aggregation * spatial_aggregation pytest * aggregate_spatial * dependencies * Update aggregate.py Code restructure * clean whitespace * update * clean up * time dimension name * add test without time dim * pre-commit * pre-commit --------- Co-authored-by: ValentinaHutter --- .../cubes/aggregate.py | 152 +++++++++++++++++- .../process_implementations/data_model.py | 2 +- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 3 +- tests/test_aggregate.py | 45 ++++++ 5 files changed, 197 insertions(+), 7 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/aggregate.py b/openeo_processes_dask/process_implementations/cubes/aggregate.py index cf70f21a..a0c3a2af 100644 --- a/openeo_processes_dask/process_implementations/cubes/aggregate.py +++ b/openeo_processes_dask/process_implementations/cubes/aggregate.py @@ -1,6 +1,16 @@ +import gc +import logging from typing import Callable, Optional, Union +import dask.array as da +import geopandas as gpd +import numpy as np +import pandas as pd import rasterio +import shapely +import xarray as xr +import xvec +from joblib import Parallel, delayed from openeo_pg_parser_networkx.pg_schema import TemporalInterval, TemporalIntervals from openeo_processes_dask.process_implementations.data_model import ( @@ -12,8 +22,13 @@ TooManyDimensions, ) +DEFAULT_CRS = "EPSG:4326" + + __all__ = ["aggregate_temporal", "aggregate_temporal_period", "aggregate_spatial"] +logger = logging.getLogger(__name__) + def geometry_mask(geoms, geobox, all_touched=False, invert=False): return rasterio.features.geometry_mask( @@ -109,11 +124,140 @@ def aggregate_temporal_period( ) +def _aggregate_geometry( + data: RasterCube, + geom, + transform, + reducer: Callable, +): + data_dims = list(data.dims) + y_dim = data.openeo.y_dim + x_dim = data.openeo.x_dim + t_dim = data.openeo.temporal_dims + t_dim = None if len(t_dim) == 0 else t_dim[0] + b_dim = data.openeo.band_dims + b_dim = None if len(b_dim) == 0 else b_dim[0] + + y_dim_size = data.sizes[y_dim] + x_dim_size = data.sizes[x_dim] + + # Create a GeoSeries from the geometry + geo_series = gpd.GeoSeries(geom) + + # Convert the GeoSeries to a GeometryArray + geometry_array = geo_series.geometry.array + + mask = rasterio.features.geometry_mask( + geometry_array, out_shape=(y_dim_size, x_dim_size), transform=transform + ) + + if t_dim is not None: + mask = np.expand_dims(mask, axis=data_dims.index(t_dim)) + if b_dim is not None: + mask = np.expand_dims(mask, axis=data_dims.index(b_dim)) + + masked_data = data * mask + del mask, data + gc.collect() + + positional_parameters = {"data": 0} + + stat_within_polygon = masked_data.reduce( + reducer, + axis=(data_dims.index(y_dim), data_dims.index(x_dim)), + keep_attrs=True, + ignore_nodata=True, + positional_parameters=positional_parameters, + ) + result = stat_within_polygon.values + + del masked_data, stat_within_polygon + gc.collect() + + return result.T + + def aggregate_spatial( data: RasterCube, - geometries: VectorCube, + geometries, reducer: Callable, - target_dimension: str = "result", - **kwargs, + chunk_size: int = 2, ) -> VectorCube: - raise NotImplementedError + t_dim = data.openeo.temporal_dims + t_dim = None if len(t_dim) == 0 else t_dim[0] + b_dim = data.openeo.band_dims + b_dim = None if len(b_dim) == 0 else b_dim[0] + + if "type" in geometries and geometries["type"] == "FeatureCollection": + gdf = gpd.GeoDataFrame.from_features(geometries, DEFAULT_CRS) + elif "type" in geometries and geometries["type"] in ["Polygon"]: + polygon = shapely.geometry.Polygon(geometries["coordinates"][0]) + gdf = gpd.GeoDataFrame(geometry=[polygon]) + gdf.crs = DEFAULT_CRS + + transform = data.rio.transform() + geometries = gdf.geometry.values + + geometry_chunks = [ + geometries[i : i + chunk_size] for i in range(0, len(geometries), chunk_size) + ] + + computed_results = [] + logger.info(f"Running aggregate_spatial process") + try: + for i, chunk in enumerate(geometry_chunks): + # Create a list of delayed objects for the current chunk + chunk_results = Parallel(n_jobs=-1)( + delayed(_aggregate_geometry)( + data, geom, transform=transform, reducer=reducer + ) + for geom in chunk + ) + computed_results.extend(chunk_results) + except: + logger.debug(f"Running process failed at {(i+1) *2} geometry") + + logger.info(f"Finish aggregate_spatial process for {len(geometries)}") + + final_results = np.stack(computed_results) + del chunk_results, geometry_chunks, computed_results + gc.collect() + + df = pd.DataFrame() + keys_items = {} + + for idx, b in enumerate(data[b_dim].values): + columns = [] + if t_dim: + for t in range(len(data[t_dim])): + columns.append(f"{b}_time{t+1}") + aggregated_data = final_results[:, idx, :] + else: + columns.append(f"{b}") + aggregated_data = final_results[:, idx] + + keys_items[b] = columns + + # Create a new DataFrame with the current data and columns + band_df = pd.DataFrame(aggregated_data, columns=columns) + # Concatenate the new DataFrame with the existing DataFrame + df = pd.concat([df, band_df], axis=1) + + df = gpd.GeoDataFrame(df, geometry=gdf.geometry) + + data_vars = {} + for key in keys_items.keys(): + data_vars[key] = (["geometry", t_dim], df[keys_items[key]]) + + ## Create VectorCube + if t_dim: + times = list(data[t_dim].values) + vec_cube = xr.Dataset( + data_vars=data_vars, coords={"geometry": df.geometry, t_dim: times} + ).xvec.set_geom_indexes("geometry", crs=df.crs) + else: + vec_cube = xr.Dataset( + data_vars=data_vars, coords=dict(geometry=df.geometry) + ).xvec.set_geom_indexes("geometry", crs=df.crs) + + return vec_cube diff --git a/openeo_processes_dask/process_implementations/data_model.py b/openeo_processes_dask/process_implementations/data_model.py index 5fa28167..4aa1cb21 100644 --- a/openeo_processes_dask/process_implementations/data_model.py +++ b/openeo_processes_dask/process_implementations/data_model.py @@ -7,4 +7,4 @@ import xarray as xr RasterCube = xr.DataArray -VectorCube = Union[gpd.GeoDataFrame, dask_geopandas.GeoDataFrame] +VectorCube = Union[gpd.GeoDataFrame, dask_geopandas.GeoDataFrame, xr.Dataset] diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 86110fac..6885090f 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 86110facbca0e321afd0ac155b753864ecc7778a +Subproject commit 6885090f736092353a0558b14c1aedee03ce87c6 diff --git a/pyproject.toml b/pyproject.toml index 9f58b1ef..51d79eb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ stac_validator = { version = ">=3.3.1", optional = true } stackstac = { version = ">=0.4.3", optional = true } pystac_client = { version = ">=0.6.1", optional = true } planetary_computer = { version = ">=0.5.1", optional = true } +xvec = { version = ">=0.1.0", optional = true } [tool.poetry.group.dev.dependencies] pytest = "^7.2.0" @@ -48,7 +49,7 @@ pre-commit = "^2.20.0" pytest-cov = "^4.0.0" [tool.poetry.extras] -implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator"] +implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator", "xvec"] ml = ["xgboost"] [tool.pytest.ini_options] diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index df0e5f6b..1ee894ad 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -5,8 +5,10 @@ from openeo_pg_parser_networkx.pg_schema import ParameterReference, TemporalInterval from openeo_processes_dask.process_implementations.cubes.aggregate import ( + aggregate_spatial, aggregate_temporal_period, ) +from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension from openeo_processes_dask.process_implementations.math import mean from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks from tests.mockdata import create_fake_rastercube @@ -90,3 +92,46 @@ def test_aggregate_temporal_period_numpy_equals_dask( assert_numpy_equals_dask_numpy( numpy_cube=numpy_cube, dask_cube=dask_cube, func=func ) + + +@pytest.mark.parametrize("size", [(30, 30, 30, 3)]) +@pytest.mark.parametrize("dtype", [np.int8]) +def test_aggregate_spatial( + random_raster_data, + bounding_box, + temporal_interval, + polygon_geometry_small, + process_registry, +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04"], + backend="dask", + ) + + reducer = partial( + process_registry["mean"].implementation, + data=ParameterReference(from_parameter="data"), + ) + + output_cube = aggregate_spatial( + data=input_cube, geometries=polygon_geometry_small, reducer=reducer + ) + + assert len(output_cube.dims) < len(input_cube.dims) + + _process = partial( + process_registry["median"].implementation, + ignore_nodata=True, + data=ParameterReference(from_parameter="data"), + ) + + reduced_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="t") + + output_cube = aggregate_spatial( + data=reduced_cube, geometries=polygon_geometry_small, reducer=reducer + ) + + assert len(output_cube.dims) < len(reduced_cube.dims) From 6d71a7f9360f0ee591c25b71718a256c0c99f1de Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Wed, 22 Nov 2023 15:17:33 +0100 Subject: [PATCH 02/48] 2023.11.2 for openeo-processes-dask (#196) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 51d79eb7..1cd418e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.1" +version = "2023.11.2" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 9df35805be6b0325fe315006cf8be934f84fbf94 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Thu, 23 Nov 2023 09:36:41 +0100 Subject: [PATCH 03/48] updated spec (#197) * updated spec * bump-2023.11.3 --- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 6885090f..86110fac 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 6885090f736092353a0558b14c1aedee03ce87c6 +Subproject commit 86110facbca0e321afd0ac155b753864ecc7778a diff --git a/pyproject.toml b/pyproject.toml index 1cd418e0..181571e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.2" +version = "2023.11.3" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 6c1e2c0705db30a35bbd24dbc3af4cfe92fa515d Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:54:56 +0100 Subject: [PATCH 04/48] Allow xgboost 2.0 and higher (#198) allow xgboost 2.0 and higher --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 181571e0..1ba5e688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.3" +version = "2023.11.4" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] @@ -29,7 +29,7 @@ xarray = { version = ">=2022.11.0", optional = true } dask = {extras = ["array"], version = ">=2023.4.0", optional = true} rasterio = { version = "^1.3.4", optional = true } dask-geopandas = { version = ">=0.2.0,<1", optional = true } -xgboost = { version = "^1.5.1", optional = true } +xgboost = { version = ">=1.5.1", optional = true } rioxarray = { version = ">=0.12.0,<1", optional = true } openeo-pg-parser-networkx = { version = ">=2023.5.1", optional = true } odc-geo = { version = ">=0.4.1,<1", optional = true } From eaf2917fe44c23b157673c279406bcdd4eca66d0 Mon Sep 17 00:00:00 2001 From: Sean <63349506+SerRichard@users.noreply.github.com> Date: Mon, 27 Nov 2023 18:01:13 +0100 Subject: [PATCH 05/48] update sub module (#199) * update sub module to include vessel detection --------- Co-authored-by: sean --- openeo_processes_dask/specs/openeo-processes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 86110fac..7044771f 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 86110facbca0e321afd0ac155b753864ecc7778a +Subproject commit 7044771f54525f6b613a8fc065f5f7d688eeca35 From 0fba2ab6134cf417c1db358b7736912d37f170ba Mon Sep 17 00:00:00 2001 From: Sean <63349506+SerRichard@users.noreply.github.com> Date: Mon, 27 Nov 2023 18:02:32 +0100 Subject: [PATCH 06/48] Update pyproject.toml (#200) Update version number --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1ba5e688..44f8f150 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.4" +version = "2023.11.5" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 8d9e3fb009676a39c3ce5009bc1c09909e5bbfb3 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Tue, 28 Nov 2023 14:33:11 +0100 Subject: [PATCH 07/48] Apply dim (#201) * update params for apply dim * update params for apply dim --- openeo_processes_dask/process_implementations/core.py | 2 +- openeo_processes_dask/process_implementations/cubes/apply.py | 1 + pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/core.py b/openeo_processes_dask/process_implementations/core.py index cefaf919..e3fa7ce7 100644 --- a/openeo_processes_dask/process_implementations/core.py +++ b/openeo_processes_dask/process_implementations/core.py @@ -68,7 +68,7 @@ def wrapper( else: resolved_kwargs[k] = arg - special_args = ["axis", "keepdims", "source_transposed_axis"] + special_args = ["axis", "keepdims", "source_transposed_axis", "context"] # Remove 'axis' and keepdims parameter if not expected in function signature. for arg in special_args: if arg not in inspect.signature(f).parameters: diff --git a/openeo_processes_dask/process_implementations/cubes/apply.py b/openeo_processes_dask/process_implementations/cubes/apply.py index 381769f7..6737c1fe 100644 --- a/openeo_processes_dask/process_implementations/cubes/apply.py +++ b/openeo_processes_dask/process_implementations/cubes/apply.py @@ -67,6 +67,7 @@ def apply_dimension( "axis": reordered_data.get_axis_num(dimension), "keepdims": True, "source_transposed_axis": data.get_axis_num(dimension), + "context": context, }, exclude_dims={dimension}, ) diff --git a/pyproject.toml b/pyproject.toml index 44f8f150..8725d394 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.5" +version = "2023.11.6" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 4a011c44757e10cf697e0c671819a8d51a0e2458 Mon Sep 17 00:00:00 2001 From: Michele Claus <31700619+clausmichele@users.noreply.github.com> Date: Tue, 2 Jan 2024 16:22:16 +0100 Subject: [PATCH 08/48] feat: add mask and fix resample_cube_spatial (#165) --- .../process_implementations/__init__.py | 12 +- .../process_implementations/cubes/__init__.py | 2 + .../process_implementations/cubes/mask.py | 120 ++++++++++++++++++ .../process_implementations/cubes/resample.py | 75 ++++++++++- .../process_implementations/exceptions.py | 8 ++ .../experimental/__init__.py | 1 - .../experimental/resample.py | 67 ---------- tests/test_mask.py | 62 ++++++++- tests/test_resample.py | 49 +++++++ 9 files changed, 320 insertions(+), 76 deletions(-) create mode 100644 openeo_processes_dask/process_implementations/cubes/mask.py delete mode 100644 openeo_processes_dask/process_implementations/experimental/resample.py diff --git a/openeo_processes_dask/process_implementations/__init__.py b/openeo_processes_dask/process_implementations/__init__.py index 875ee6d5..aa7c3d15 100644 --- a/openeo_processes_dask/process_implementations/__init__.py +++ b/openeo_processes_dask/process_implementations/__init__.py @@ -15,12 +15,12 @@ "Did not load machine learning processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, ml]`" ) -try: - from .experimental import * -except ImportError as e: - logger.warning( - "Did not experimental processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, experimental]`" - ) +# try: +# from .experimental import * +# except ImportError as e: +# logger.warning( +# "Did not experimental processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, experimental]`" +# ) import rioxarray as rio # Required for the .rio accessor on xarrays. diff --git a/openeo_processes_dask/process_implementations/cubes/__init__.py b/openeo_processes_dask/process_implementations/cubes/__init__.py index d18a1475..11dce1b5 100644 --- a/openeo_processes_dask/process_implementations/cubes/__init__.py +++ b/openeo_processes_dask/process_implementations/cubes/__init__.py @@ -4,6 +4,8 @@ from .general import * from .indices import * from .load import * +from .mask import * from .mask_polygon import * from .merge import * from .reduce import * +from .resample import * diff --git a/openeo_processes_dask/process_implementations/cubes/mask.py b/openeo_processes_dask/process_implementations/cubes/mask.py new file mode 100644 index 00000000..96cc09b0 --- /dev/null +++ b/openeo_processes_dask/process_implementations/cubes/mask.py @@ -0,0 +1,120 @@ +import logging +from typing import Callable + +import numpy as np + +from openeo_processes_dask.process_implementations.cubes.resample import ( + resample_cube_spatial, +) +from openeo_processes_dask.process_implementations.cubes.utils import notnull +from openeo_processes_dask.process_implementations.data_model import RasterCube +from openeo_processes_dask.process_implementations.exceptions import ( + DimensionLabelCountMismatch, + DimensionMismatch, + LabelMismatch, +) +from openeo_processes_dask.process_implementations.logic import _not + +logger = logging.getLogger(__name__) + +__all__ = ["mask"] + + +def mask(data: RasterCube, mask: RasterCube, replacement=None) -> RasterCube: + if replacement is None: + replacement = np.nan + + data_band_dims = data.openeo.band_dims + mask_band_dims = mask.openeo.band_dims + # Check if temporal dimensions are present and check the names + data_temporal_dims = data.openeo.temporal_dims + mask_temporal_dims = mask.openeo.temporal_dims + + check_temporal_labels = True + if not set(data_temporal_dims) == set(mask_temporal_dims): + check_temporal_labels = False + # To continue with a valid case, mask shouldn't have a temporal dimension, so that the mask will be applied to all the temporal labels + if len(mask_temporal_dims) != 0: + raise DimensionMismatch( + f"data and mask temporal dimensions do no match: data has temporal dimensions ({data_temporal_dims}) and mask {mask_temporal_dims}." + ) + if check_temporal_labels: + # Check if temporal labels correspond + for n in data_temporal_dims: + data_temporal_labels = data[n].values + mask_temporal_labels = mask[n].values + data_n_labels = len(data_temporal_labels) + mask_n_labels = len(mask_temporal_labels) + + if not data_n_labels == mask_n_labels: + raise DimensionLabelCountMismatch( + f"data and mask temporal dimensions do no match: data has {data_n_labels} temporal dimensions labels and mask {mask_n_labels}." + ) + elif not all(data_temporal_labels == mask_temporal_labels): + raise LabelMismatch( + f"data and mask temporal dimension labels don't match for dimension {n}." + ) + + # From the process definition: https://processes.openeo.org/#mask + # The data cubes have to be compatible except that the horizontal spatial dimensions (axes x and y) will be aligned implicitly by resample_cube_spatial. + apply_resample_cube_spatial = False + + # Check if spatial dimensions have the same name + data_spatial_dims = data.openeo.spatial_dims + mask_spatial_dims = mask.openeo.spatial_dims + if not set(data_spatial_dims) == set(mask_spatial_dims): + raise DimensionMismatch( + f"data and mask spatial dimensions do no match: data has spatial dimensions ({data_spatial_dims}) and mask {mask_spatial_dims}" + ) + + # Check if spatial labels correspond + for n in data_spatial_dims: + data_spatial_labels = data[n].values + mask_spatial_labels = mask[n].values + data_n_labels = len(data_spatial_labels) + mask_n_labels = len(mask_spatial_labels) + + if not data_n_labels == mask_n_labels: + apply_resample_cube_spatial = True + logger.info( + f"data and mask spatial dimension labels don't match: data has ({data_n_labels}) labels and mask has {mask_n_labels} for dimension {n}." + ) + elif not all(data_spatial_labels == mask_spatial_labels): + apply_resample_cube_spatial = True + logger.info( + f"data and mask spatial dimension labels don't match for dimension {n}, i.e. the coordinate values are different." + ) + + if apply_resample_cube_spatial: + logger.info(f"mask is aligned to data using resample_cube_spatial.") + mask = resample_cube_spatial(data=mask, target=data) + + original_dim_order = data.dims + # If bands dimension in data but not in mask, ensure that it comes first and all the other dimensions at the end + if len(data_band_dims) != 0 and len(mask_band_dims) == 0: + required_dim_order = ( + data_band_dims[0] if len(data_band_dims) > 0 else (), + data_temporal_dims[0] if len(data_temporal_dims) > 0 else (), + data.openeo.y_dim, + data.openeo.x_dim, + ) + data = data.transpose(*required_dim_order, missing_dims="ignore") + mask = mask.transpose(*required_dim_order, missing_dims="ignore") + + elif len(data_temporal_dims) != 0 and len(mask_temporal_dims) == 0: + required_dim_order = ( + data_temporal_dims[0] if len(data_temporal_dims) > 0 else (), + data_band_dims[0] if len(data_band_dims) > 0 else (), + data.openeo.y_dim, + data.openeo.x_dim, + ) + data = data.transpose(*required_dim_order, missing_dims="ignore") + mask = mask.transpose(*required_dim_order, missing_dims="ignore") + + data = data.where(_not(mask), replacement) + + if len(data_band_dims) != 0 and len(mask_band_dims) == 0: + # Order axes back to how they were before + data = data.transpose(*original_dim_order) + + return data diff --git a/openeo_processes_dask/process_implementations/cubes/resample.py b/openeo_processes_dask/process_implementations/cubes/resample.py index 01d4bb29..1211cd97 100644 --- a/openeo_processes_dask/process_implementations/cubes/resample.py +++ b/openeo_processes_dask/process_implementations/cubes/resample.py @@ -2,14 +2,20 @@ from typing import Optional, Union import odc.geo.xr +import rioxarray # needs to be imported to set .rio accessor on xarray objects. from odc.geo.geobox import resolution_from_affine from pyproj.crs import CRS, CRSError from openeo_processes_dask.process_implementations.data_model import RasterCube -from openeo_processes_dask.process_implementations.exceptions import OpenEOException +from openeo_processes_dask.process_implementations.exceptions import ( + DimensionMissing, + OpenEOException, +) logger = logging.getLogger(__name__) +__all__ = ["resample_spatial", "resample_cube_spatial"] + resample_methods_list = [ "near", "bilinear", @@ -78,3 +84,70 @@ def resample_spatial( reprojected.attrs["crs"] = data_cp.rio.crs return reprojected + + +def resample_cube_spatial( + data: RasterCube, target: RasterCube, method="near", options=None +) -> RasterCube: + methods_list = [ + "near", + "bilinear", + "cubic", + "cubicspline", + "lanczos", + "average", + "mode", + "max", + "min", + "med", + "q1", + "q3", + ] + + if ( + data.openeo.y_dim is None + or data.openeo.x_dim is None + or target.openeo.y_dim is None + or target.openeo.x_dim is None + ): + raise DimensionMissing( + f"Spatial dimension missing from data or target. Available dimensions for data: {data.dims} for target: {target.dims}" + ) + + # ODC reproject requires y to be before x + required_dim_order = (..., data.openeo.y_dim, data.openeo.x_dim) + + data_reordered = data.transpose(*required_dim_order, missing_dims="ignore") + target_reordered = target.transpose(*required_dim_order, missing_dims="ignore") + + if method == "near": + method = "nearest" + + elif method not in methods_list: + raise Exception( + f'Selected resampling method "{method}" is not available! Please select one of ' + f"[{', '.join(methods_list)}]" + ) + + resampled_data = data_reordered.odc.reproject( + target_reordered.odc.geobox, resampling=method + ) + + resampled_data.rio.write_crs(target_reordered.rio.crs, inplace=True) + + try: + # odc.reproject renames the coordinates according to the geobox, this undoes that. + resampled_data = resampled_data.rename( + {"longitude": data.openeo.x_dim, "latitude": data.openeo.y_dim} + ) + except ValueError: + pass + + # Order axes back to how they were before + resampled_data = resampled_data.transpose(*data.dims) + + # Ensure that attrs except crs are copied over + for k, v in data.attrs.items(): + if k.lower() != "crs": + resampled_data.attrs[k] = v + return resampled_data diff --git a/openeo_processes_dask/process_implementations/exceptions.py b/openeo_processes_dask/process_implementations/exceptions.py index 6d1cf128..a61da854 100644 --- a/openeo_processes_dask/process_implementations/exceptions.py +++ b/openeo_processes_dask/process_implementations/exceptions.py @@ -80,3 +80,11 @@ class RedBandAmbiguous(OpenEOException): class BandExists(OpenEOException): pass + + +class DimensionMismatch(OpenEOException): + pass + + +class LabelMismatch(OpenEOException): + pass diff --git a/openeo_processes_dask/process_implementations/experimental/__init__.py b/openeo_processes_dask/process_implementations/experimental/__init__.py index efabc3ec..e69de29b 100644 --- a/openeo_processes_dask/process_implementations/experimental/__init__.py +++ b/openeo_processes_dask/process_implementations/experimental/__init__.py @@ -1 +0,0 @@ -from .resample import * diff --git a/openeo_processes_dask/process_implementations/experimental/resample.py b/openeo_processes_dask/process_implementations/experimental/resample.py deleted file mode 100644 index e349f23e..00000000 --- a/openeo_processes_dask/process_implementations/experimental/resample.py +++ /dev/null @@ -1,67 +0,0 @@ -import rioxarray # needs to be imported to set .rio accessor on xarray objects. - -from openeo_processes_dask.process_implementations.data_model import RasterCube - -__all__ = ["resample_cube_spatial"] - - -def resample_cube_spatial( - data: RasterCube, target: RasterCube, method="near", options=None -) -> RasterCube: - methods_list = [ - "near", - "bilinear", - "cubic", - "cubicspline", - "lanczos", - "average", - "mode", - "max", - "min", - "med", - "q1", - "q3", - ] - - # ODC reproject requires y to be before x - required_dim_order = ( - data.openeo.band_dims - + data.openeo.temporal_dims - + tuple(data.openeo.y_dim) - + tuple(data.openeo.x_dim) - ) - - data_reordered = data.transpose(*required_dim_order, missing_dims="ignore") - target_reordered = target.transpose(*required_dim_order, missing_dims="ignore") - - if method == "near": - method = "nearest" - - elif method not in methods_list: - raise Exception( - f'Selected resampling method "{method}" is not available! Please select one of ' - f"[{', '.join(methods_list)}]" - ) - - resampled_data = data_reordered.odc.reproject( - target_reordered.odc.geobox, resampling=method - ) - - resampled_data.rio.write_crs(target_reordered.rio.crs, inplace=True) - - try: - # odc.reproject renames the coordinates according to the geobox, this undoes that. - resampled_data = resampled_data.rename( - {"longitude": data.openeo.x_dim, "latitude": data.openeo.y_dim} - ) - except ValueError: - pass - - # Order axes back to how they were before - resampled_data = resampled_data.transpose(*data.dims) - - # Ensure that attrs except crs are copied over - for k, v in data.attrs.items(): - if k.lower() != "crs": - resampled_data.attrs[k] = v - return resampled_data diff --git a/tests/test_mask.py b/tests/test_mask.py index c5c6148e..efda7f85 100644 --- a/tests/test_mask.py +++ b/tests/test_mask.py @@ -1,10 +1,17 @@ +from functools import partial + import numpy as np import pytest -from openeo_pg_parser_networkx.pg_schema import TemporalInterval +from openeo_pg_parser_networkx.pg_schema import ParameterReference, TemporalInterval +from openeo_processes_dask.process_implementations.cubes.mask import mask from openeo_processes_dask.process_implementations.cubes.mask_polygon import ( mask_polygon, ) +from openeo_processes_dask.process_implementations.cubes.reduce import ( + reduce_dimension, + reduce_spatial, +) from tests.mockdata import create_fake_rastercube @@ -29,3 +36,56 @@ def test_mask_polygon( assert np.isnan(output_cube).sum() > np.isnan(input_cube).sum() assert len(output_cube.y) == len(input_cube.y) assert len(output_cube.x) == len(input_cube.x) + + +@pytest.mark.parametrize("size", [(30, 30, 20, 2)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_mask( + temporal_interval, + bounding_box, + random_raster_data, + process_registry, +): + """Test to ensure resolution gets changed correctly.""" + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03"], + backend="dask", + ) + + mask_cube = input_cube > 0 + output_cube = mask(data=input_cube, mask=mask_cube) + + assert np.isnan(output_cube).sum() > np.isnan(input_cube).sum() + assert len(output_cube.y) == len(input_cube.y) + assert len(output_cube.x) == len(input_cube.x) + + _process = partial( + process_registry["max"].implementation, + ignore_nodata=True, + data=ParameterReference(from_parameter="data"), + ) + + mask_cube_no_x = reduce_dimension(data=mask_cube, dimension="x", reducer=_process) + with pytest.raises(Exception): + output_cube = mask(data=input_cube, mask=mask_cube_no_x) + + # Mask should work without bands + mask_cube_no_bands = reduce_dimension( + data=mask_cube, dimension="bands", reducer=_process + ) + output_cube = mask(data=input_cube, mask=mask_cube_no_bands) + + # Mask should work without time + mask_cube_no_time = reduce_dimension( + data=mask_cube, dimension="t", reducer=_process + ) + output_cube = mask(data=input_cube, mask=mask_cube_no_time) + + # Mask should work without time and bands + mask_cube_no_time_bands = reduce_dimension( + data=mask_cube_no_bands, dimension="t", reducer=_process + ) + output_cube = mask(data=input_cube, mask=mask_cube_no_time_bands) diff --git a/tests/test_resample.py b/tests/test_resample.py index 44632495..1a70e671 100644 --- a/tests/test_resample.py +++ b/tests/test_resample.py @@ -4,6 +4,7 @@ from pyproj.crs import CRS from openeo_processes_dask.process_implementations.cubes.resample import ( + resample_cube_spatial, resample_spatial, ) from tests.general_checks import general_output_checks @@ -63,3 +64,51 @@ def test_resample_spatial( assert min(output_cube.y) >= -90 assert max(output_cube.y) <= 90 + + +@pytest.mark.parametrize( + "output_crs", + [ + 3587, + "32633", + "+proj=aeqd +lat_0=53 +lon_0=24 +x_0=5837287.81977 +y_0=2121415.69617 +datum=WGS84 +units=m +no_defs", + "4326", + ], +) +@pytest.mark.parametrize("output_res", [5, 30, 60]) +@pytest.mark.parametrize("size", [(30, 30, 20, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_resample_cube_spatial( + output_crs, output_res, temporal_interval, bounding_box, random_raster_data +): + """Test to ensure resolution gets changed correctly.""" + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + resampled_cube = resample_spatial( + data=input_cube, projection=output_crs, resolution=output_res + ) + + with pytest.raises(Exception): + output_cube = resample_cube_spatial( + data=input_cube, target=resampled_cube, method="bad" + ) + + output_cube = resample_cube_spatial( + data=input_cube, target=resampled_cube, method="average" + ) + + general_output_checks( + input_cube=input_cube, + output_cube=output_cube, + expected_dims=input_cube.dims, + verify_attrs=False, + verify_crs=False, + ) + + assert output_cube.odc.spatial_dims == ("y", "x") From 03037f9d8568e3ab76c2ce240380c1d01570fcb9 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Wed, 3 Jan 2024 13:20:07 +0100 Subject: [PATCH 09/48] Fix: dimension_labels return list, datetimes are ISO formatted --- .../process_implementations/cubes/general.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/general.py b/openeo_processes_dask/process_implementations/cubes/general.py index 14639250..66aac394 100644 --- a/openeo_processes_dask/process_implementations/cubes/general.py +++ b/openeo_processes_dask/process_implementations/cubes/general.py @@ -1,6 +1,8 @@ from typing import Optional +import numpy as np import xarray as xr +from numpy.typing import ArrayLike from openeo_pg_parser_networkx.pg_schema import * from openeo_processes_dask.process_implementations.data_model import RasterCube @@ -28,12 +30,17 @@ def create_raster_cube() -> RasterCube: return xr.DataArray() -def dimension_labels(data: RasterCube, dimension: str) -> RasterCube: +def dimension_labels(data: RasterCube, dimension: str) -> ArrayLike: if dimension not in data.dims: raise DimensionNotAvailable( f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" ) - return data.coords[dimension] + + coords = data.coords[dimension] + if np.issubdtype(coords.dtype, np.datetime64): + return np.datetime_as_string(coords, timezone="UTC") + else: + return np.array(data.coords[dimension]) def add_dimension( From 3ffefe7f9f19285233bf34478710dc19cfb6a8ce Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 8 Jan 2024 10:06:00 +0100 Subject: [PATCH 10/48] first/last: Convert to numpy if list is provided (#212) Convert to numpy if list is provided --- openeo_processes_dask/process_implementations/arrays.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openeo_processes_dask/process_implementations/arrays.py b/openeo_processes_dask/process_implementations/arrays.py index 3f8876e4..66014f54 100644 --- a/openeo_processes_dask/process_implementations/arrays.py +++ b/openeo_processes_dask/process_implementations/arrays.py @@ -191,6 +191,8 @@ def first( ignore_nodata: Optional[bool] = True, axis: Optional[str] = None, ): + if isinstance(data, list): + data = np.asarray(data) if len(data) == 0: return np.nan if axis is None: From ac505095af85eb7258fa124588e63559583842b8 Mon Sep 17 00:00:00 2001 From: Vincent Verelst <30296933+VincentVerelst@users.noreply.github.com> Date: Wed, 17 Jan 2024 15:40:59 +0100 Subject: [PATCH 11/48] Apply kernel (#186) * add apply_kernel * apply_kernel adjustments * added apply_kernel tests * .gitmodules * Added KernelDimensionsUneven exception for when one or more of the kernel dimensions is even. * formatting --------- Co-authored-by: Gerald Walter Irsiegler --- .../process_implementations/cubes/apply.py | 54 ++++++++++++++++++- .../process_implementations/exceptions.py | 4 ++ pyproject.toml | 1 + tests/test_apply.py | 28 ++++++++++ 4 files changed, 85 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/apply.py b/openeo_processes_dask/process_implementations/cubes/apply.py index 6737c1fe..f18db9f8 100644 --- a/openeo_processes_dask/process_implementations/cubes/apply.py +++ b/openeo_processes_dask/process_implementations/cubes/apply.py @@ -1,14 +1,16 @@ -from typing import Callable, Optional +from typing import Callable, Optional, Union import numpy as np +import scipy.ndimage import xarray as xr from openeo_processes_dask.process_implementations.data_model import RasterCube from openeo_processes_dask.process_implementations.exceptions import ( DimensionNotAvailable, + KernelDimensionsUneven, ) -__all__ = ["apply", "apply_dimension"] +__all__ = ["apply", "apply_dimension", "apply_kernel"] def apply( @@ -83,3 +85,51 @@ def apply_dimension( reordered_result.openeo.add_dim_type(name=target_dimension, type="other") return reordered_result + + +def apply_kernel( + data: RasterCube, + kernel: np.ndarray, + factor: Optional[float] = 1, + border: Union[float, str, None] = 0, + replace_invalid: Optional[float] = 0, +) -> RasterCube: + kernel = np.asarray(kernel) + if any(dim % 2 == 0 for dim in kernel.shape): + raise KernelDimensionsUneven( + "Each dimension of the kernel must have an uneven number of elements." + ) + + def convolve(data, kernel, mode="constant", cval=0, fill_value=0): + dims = ("y", "x") + convolved = lambda data: scipy.ndimage.convolve( + data, kernel, mode=mode, cval=cval + ) + + data_masked = data.fillna(fill_value) + + return xr.apply_ufunc( + convolved, + data_masked, + vectorize=True, + dask="parallelized", + input_core_dims=[dims], + output_core_dims=[dims], + output_dtypes=[data.dtype], + dask_gufunc_kwargs={"allow_rechunk": True}, + ).transpose(*data.dims) + + openeo_scipy_modes = { + "replicate": "nearest", + "reflect": "reflect", + "reflect_pixel": "mirror", + "wrap": "wrap", + } + if isinstance(border, int) or isinstance(border, float): + mode = "constant" + cval = border + else: + mode = openeo_scipy_modes[border] + cval = 0 + + return convolve(data, kernel, mode, cval, replace_invalid) * factor diff --git a/openeo_processes_dask/process_implementations/exceptions.py b/openeo_processes_dask/process_implementations/exceptions.py index a61da854..b13aac5e 100644 --- a/openeo_processes_dask/process_implementations/exceptions.py +++ b/openeo_processes_dask/process_implementations/exceptions.py @@ -88,3 +88,7 @@ class DimensionMismatch(OpenEOException): class LabelMismatch(OpenEOException): pass + + +class KernelDimensionsUneven(OpenEOException): + pass diff --git a/pyproject.toml b/pyproject.toml index 8725d394..19758826 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ stac_validator = { version = ">=3.3.1", optional = true } stackstac = { version = ">=0.4.3", optional = true } pystac_client = { version = ">=0.6.1", optional = true } planetary_computer = { version = ">=0.5.1", optional = true } +scipy = "^1.11.3" xvec = { version = ">=0.1.0", optional = true } [tool.poetry.group.dev.dependencies] diff --git a/tests/test_apply.py b/tests/test_apply.py index de567d2f..fb396b4c 100644 --- a/tests/test_apply.py +++ b/tests/test_apply.py @@ -9,6 +9,7 @@ from openeo_processes_dask.process_implementations.cubes.apply import ( apply, apply_dimension, + apply_kernel, ) from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks from tests.mockdata import create_fake_rastercube @@ -235,3 +236,30 @@ def test_apply_dimension_ordering_processes( np.testing.assert_array_equal( output_cube_sort.data, rearrange_by_expected_order.data ) + + +@pytest.mark.parametrize("size", [(6, 5, 4, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_apply_kernel(temporal_interval, bounding_box, random_raster_data): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + # Following kernel should leave cube unchanged + kernel = np.asarray([[0, 0, 0], [0, 1, 0], [0, 0, 0]]) + + output_cube = apply_kernel(data=input_cube, kernel=kernel) + + general_output_checks( + input_cube=input_cube, + output_cube=output_cube, + verify_attrs=True, + verify_crs=True, + expected_results=input_cube, + ) + + xr.testing.assert_equal(output_cube, input_cube) From ca3226a009fc8023959e85cd25d61b53f70bbaf8 Mon Sep 17 00:00:00 2001 From: Gerald Walter Irsiegler Date: Mon, 22 Jan 2024 15:00:56 +0100 Subject: [PATCH 12/48] Fix: Apply dim exceptions (#218) * Fix: dimension names wrong if dimension is only 1 long, crs not being written in edge case. * Bump Version, add joblib dependency * remove joblib dep * Fix: test cases --------- Co-authored-by: Gerald Walter Irsiegler --- .../process_implementations/cubes/apply.py | 10 ++++++++-- pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/apply.py b/openeo_processes_dask/process_implementations/cubes/apply.py index f18db9f8..aa425687 100644 --- a/openeo_processes_dask/process_implementations/cubes/apply.py +++ b/openeo_processes_dask/process_implementations/cubes/apply.py @@ -78,8 +78,14 @@ def apply_dimension( {dimension: target_dimension} ) - if len(data[dimension]) == len(reordered_result[target_dimension]): - reordered_result.rio.write_crs(data.rio.crs, inplace=True) + if len(reordered_result[target_dimension]) == 1: + reordered_result[target_dimension] = ["0"] + + if data.rio.crs is not None: + try: + reordered_result.rio.write_crs(data.rio.crs, inplace=True) + except ValueError: + pass if is_new_dim_added: reordered_result.openeo.add_dim_type(name=target_dimension, type="other") diff --git a/pyproject.toml b/pyproject.toml index 19758826..544f20b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2023.11.6" +version = "2024.1.1" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 75472eaafa0a44aa7f3b0204f4cc93aeb46bbd69 Mon Sep 17 00:00:00 2001 From: Sean <63349506+SerRichard@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:42:49 +0100 Subject: [PATCH 13/48] Bug in mask polygon (#220) should not index a string Co-authored-by: sean --- .../process_implementations/cubes/mask_polygon.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/mask_polygon.py b/openeo_processes_dask/process_implementations/cubes/mask_polygon.py index 5731b45a..f9cc5a38 100644 --- a/openeo_processes_dask/process_implementations/cubes/mask_polygon.py +++ b/openeo_processes_dask/process_implementations/cubes/mask_polygon.py @@ -88,7 +88,7 @@ def mask_polygon( for i, d in enumerate(data_dims): data_chunks[d] = chunks_shapes[i][0] - if data_dims.index(x_dim[0]) < data_dims.index(y_dim[0]): + if data_dims.index(x_dim) < data_dims.index(y_dim): final_mask = da.zeros( (x_dim_size, y_dim_size), chunks={x_dim: data_chunks[x_dim], y_dim: data_chunks[y_dim]}, diff --git a/pyproject.toml b/pyproject.toml index 544f20b0..3df078ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.1.1" +version = "2024.1.2" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 3a023349082fb83128c7b967207242b4d4e9de56 Mon Sep 17 00:00:00 2001 From: Sean <63349506+SerRichard@users.noreply.github.com> Date: Fri, 2 Feb 2024 16:10:33 +0100 Subject: [PATCH 14/48] bump submodule to include new process specs (#221) * bump submodule to include new process specs * pre commit hooks --------- Co-authored-by: sean --- openeo_processes_dask/specs/openeo-processes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 7044771f..16a0ec6f 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 7044771f54525f6b613a8fc065f5f7d688eeca35 +Subproject commit 16a0ec6f3a256ffd5044cc6d3c712ce9fd92ff73 From 0ff4688d1b9d5c2b66906b2cd6804a4e4f868bb0 Mon Sep 17 00:00:00 2001 From: Gerald Walter Irsiegler Date: Mon, 5 Feb 2024 17:23:52 +0100 Subject: [PATCH 15/48] Update: add mask to specs (#223) * add mask to specs * bump ver --------- Co-authored-by: Gerald Walter Irsiegler --- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 16a0ec6f..03d1b2aa 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 16a0ec6f3a256ffd5044cc6d3c712ce9fd92ff73 +Subproject commit 03d1b2aa8f98c8868f736025c2bfab91ef58c8ee diff --git a/pyproject.toml b/pyproject.toml index 3df078ea..ae19a666 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.1.2" +version = "2024.1.3" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 05e51b229cf04235d072488cbd6e4cae5e286798 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Wed, 7 Feb 2024 14:32:56 +0100 Subject: [PATCH 16/48] Update to 2024.2.1 openeo-processes (#224) * update to 2024.2.1 openeo-processes * bump to 2024.2.1 --- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 03d1b2aa..12444d8b 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 03d1b2aa8f98c8868f736025c2bfab91ef58c8ee +Subproject commit 12444d8bbc1a983e6b8df0ba956d057dc2d2224e diff --git a/pyproject.toml b/pyproject.toml index ae19a666..bec6ac97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.1.3" +version = "2024.2.1" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From e8197d69d2ba887b9ec4453cb3c8d7e62a570266 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 12 Feb 2024 09:40:54 +0100 Subject: [PATCH 17/48] Tests for constants pi and e (#209) --- tests/test_math.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_math.py b/tests/test_math.py index a9cffceb..edb3455c 100644 --- a/tests/test_math.py +++ b/tests/test_math.py @@ -38,6 +38,14 @@ def test_sum(): assert np.isnan(_sum([1, np.nan], ignore_nodata=False)) +def test_e(): + assert e() == pytest.approx(2.71828182846) + + +def test_pi(): + assert pi() == pytest.approx(3.14159265359) + + @pytest.mark.parametrize( "array,expected,ignore_nodata,axis", [ From 785710e61978466a2bca2483aa6769198e612414 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 12 Feb 2024 09:41:10 +0100 Subject: [PATCH 18/48] clip: throw exception is min > max (#208) * clip: throw exception is min > max * add test * run pre-commit hook --------- Co-authored-by: ValentinaHutter --- openeo_processes_dask/process_implementations/exceptions.py | 4 ++++ openeo_processes_dask/process_implementations/math.py | 5 +++++ tests/test_math.py | 3 +++ 3 files changed, 12 insertions(+) diff --git a/openeo_processes_dask/process_implementations/exceptions.py b/openeo_processes_dask/process_implementations/exceptions.py index b13aac5e..b7057a82 100644 --- a/openeo_processes_dask/process_implementations/exceptions.py +++ b/openeo_processes_dask/process_implementations/exceptions.py @@ -92,3 +92,7 @@ class LabelMismatch(OpenEOException): class KernelDimensionsUneven(OpenEOException): pass + + +class MinMaxSwapped(OpenEOException): + pass diff --git a/openeo_processes_dask/process_implementations/math.py b/openeo_processes_dask/process_implementations/math.py index 1bc927f7..931e6746 100644 --- a/openeo_processes_dask/process_implementations/math.py +++ b/openeo_processes_dask/process_implementations/math.py @@ -7,6 +7,7 @@ _is_dask_array, ) from openeo_processes_dask.process_implementations.exceptions import ( + MinMaxSwapped, OpenEOException, QuantilesParameterConflict, QuantilesParameterMissing, @@ -263,6 +264,10 @@ def extrema(data, ignore_nodata=True, axis=None, keepdims=False): def clip(x, min, max): + if min > max: + raise MinMaxSwapped( + "The minimum value should be lower than or equal to the maximum value." + ) # Cannot use positional arguments to pass min and max into np.clip, this will not work with dask. return np.clip(x, min, max) diff --git a/tests/test_math.py b/tests/test_math.py index edb3455c..5cfd1046 100644 --- a/tests/test_math.py +++ b/tests/test_math.py @@ -2,6 +2,7 @@ import numpy as np import pytest +from openeo_processes_dask.process_implementations.exceptions import MinMaxSwapped from openeo_processes_dask.process_implementations.math import * @@ -94,3 +95,5 @@ def test_clip(): result_dask = clip(dask_array, min=0, max=2) assert np.array_equal(result_np, result_dask.compute(), equal_nan=True) assert np.array_equal(result_np, np.array([2, 0])) + with pytest.raises(MinMaxSwapped): + clip(dask_array, 10, 0) From ea6f6d89536ab192c1e6e8c98ce2ccde1c26cf12 Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 12 Feb 2024 09:41:29 +0100 Subject: [PATCH 19/48] Clip correctly in linear_scale_range when inputMax < inputMin (#206) * Clip correctly in linear_scale_range when inputMax < inputMin * add tests --------- Co-authored-by: ValentinaHutter --- openeo_processes_dask/process_implementations/math.py | 5 ++++- tests/test_math.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/openeo_processes_dask/process_implementations/math.py b/openeo_processes_dask/process_implementations/math.py index 931e6746..b6f99290 100644 --- a/openeo_processes_dask/process_implementations/math.py +++ b/openeo_processes_dask/process_implementations/math.py @@ -229,7 +229,10 @@ def arctan2(y, x): def linear_scale_range(x, inputMin, inputMax, outputMin=0.0, outputMax=1.0): - x = clip(x, inputMin, inputMax) + if inputMax < inputMin: + x = clip(x, inputMax, inputMin) + else: + x = clip(x, inputMin, inputMax) lsr = ((x - inputMin) / (inputMax - inputMin)) * (outputMax - outputMin) + outputMin return lsr diff --git a/tests/test_math.py b/tests/test_math.py index 5cfd1046..361582ee 100644 --- a/tests/test_math.py +++ b/tests/test_math.py @@ -87,6 +87,16 @@ def test_normalized_difference(x, y, expected): assert np.array_equal(result_np, result_dask.compute(), equal_nan=True) +def test_linear_scale_range(): + array = np.array([5, 0]) + + dask_array = da.from_array(array) + result_dask = linear_scale_range(dask_array, inputMin=0, inputMax=5) + assert np.array_equal(np.array([1, 0]), result_dask.compute(), equal_nan=True) + result_dask = linear_scale_range(dask_array, inputMin=5, inputMax=0) + assert np.array_equal(np.array([0, 1]), result_dask.compute(), equal_nan=True) + + def test_clip(): array = np.array([5, 0]) result_np = clip(array, min=0, max=2) From b163a813c94ff86505d72f84894a90cc47aad6df Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 12 Feb 2024 09:52:52 +0100 Subject: [PATCH 20/48] extrema: skipna => ignore_nodata, ensure dtype is available (#202) * extrema: skipna => ignore_nodata * Convert to np.array so that .dtype is available below * add tests * run pre-commit hook --------- Co-authored-by: ValentinaHutter --- openeo_processes_dask/process_implementations/math.py | 6 ++++-- tests/test_math.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/math.py b/openeo_processes_dask/process_implementations/math.py index b6f99290..55e2d7c2 100644 --- a/openeo_processes_dask/process_implementations/math.py +++ b/openeo_processes_dask/process_implementations/math.py @@ -259,9 +259,11 @@ def power(base, p): def extrema(data, ignore_nodata=True, axis=None, keepdims=False): + if isinstance(data, list): + data = np.array(data) # TODO: Could be sped up by only iterating over array once - minimum = _min(data, skipna=ignore_nodata, axis=axis, keepdims=keepdims) - maximum = _max(data, skipna=ignore_nodata, axis=axis, keepdims=keepdims) + minimum = _min(data, ignore_nodata=ignore_nodata, axis=axis, keepdims=keepdims) + maximum = _max(data, ignore_nodata=ignore_nodata, axis=axis, keepdims=keepdims) array = dask.delayed(np.array)([minimum, maximum]) return da.from_delayed(array, (2,), dtype=data.dtype) diff --git a/tests/test_math.py b/tests/test_math.py index 361582ee..9eea0d1b 100644 --- a/tests/test_math.py +++ b/tests/test_math.py @@ -107,3 +107,14 @@ def test_clip(): assert np.array_equal(result_np, np.array([2, 0])) with pytest.raises(MinMaxSwapped): clip(dask_array, 10, 0) + + +def test_extrema(): + array_list = [0, 5, 10] + result_np = np.array([0, 10]) + + result = extrema(array_list) + assert np.array_equal(result_np, result.compute()) + dask_array = da.from_array(np.array(array_list)) + result = extrema(dask_array, ignore_nodata=True, axis=0, keepdims=False) + assert np.array_equal(result_np, result.compute()) From 17cd080e91d9a49931f6719cb89403972a989eac Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Mon, 12 Feb 2024 12:07:53 +0100 Subject: [PATCH 21/48] Add array_append (#210) * Add array_append * add dask test * remove np.array(1) * update type check * run pre-commit hook --------- Co-authored-by: ValentinaHutter --- .../process_implementations/arrays.py | 15 +++++++++++++++ tests/test_arrays.py | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/openeo_processes_dask/process_implementations/arrays.py b/openeo_processes_dask/process_implementations/arrays.py index 66014f54..d86860ea 100644 --- a/openeo_processes_dask/process_implementations/arrays.py +++ b/openeo_processes_dask/process_implementations/arrays.py @@ -25,6 +25,7 @@ "array_create", "array_modify", "array_concat", + "array_append", "array_contains", "array_find", "array_labels", @@ -126,6 +127,20 @@ def array_concat(array1: ArrayLike, array2: ArrayLike) -> ArrayLike: return concat +def array_append(data: ArrayLike, value: Any, label: Optional[Any] = None) -> ArrayLike: + if label is not None: + raise NotImplementedError("labelled arrays are currently not implemented.") + + if ( + not isinstance(value, list) + and not isinstance(value, np.ndarray) + and not isinstance(value, da.core.Array) + ): + value = [value] + + return array_concat(data, value) + + def array_contains(data: ArrayLike, value: Any, axis=None) -> bool: # TODO: Contrary to the process spec, our implementation does interpret temporal strings before checking them here # This is somewhat implicit in how we currently parse parameters, so cannot be easily changed. diff --git a/tests/test_arrays.py b/tests/test_arrays.py index e052547f..5c9eea64 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -137,6 +137,24 @@ def test_array_concat(array1, array2, expected): np.testing.assert_array_equal(dask_result, np.array(expected), strict=True) +@pytest.mark.parametrize( + "data, value, expected", + [ + ([2, 3], 4, [2, 3, 4]), + (["a", "b"], 1, ["a", "b", 1]), + ], +) +def test_array_append(data, value, expected): + np.testing.assert_array_equal(array_append(data, value), expected, strict=True) + np.testing.assert_array_equal( + array_append(np.array(data), np.array([value])), expected, strict=True + ) + dask_result = array_append( + da.from_array(np.array(data)), da.from_array(np.array([value])) + ) + np.testing.assert_array_equal(dask_result, np.array(expected), strict=True) + + @pytest.mark.parametrize( "data, value, expected", [ From ee54156197f7b223db3beb8e62475062bb689230 Mon Sep 17 00:00:00 2001 From: Mohammad Alasawedah <61426508+masawdah@users.noreply.github.com> Date: Mon, 12 Feb 2024 15:35:19 +0100 Subject: [PATCH 22/48] Spatila agg xvec (#217) * spatial_agg xvec * callable reducer * jobloib * clean uo * add values test * run pre-commit hook * add geom url test * add geom polygon test * run pre-commit hook --------- Co-authored-by: ValentinaHutter --- .../cubes/aggregate.py | 177 ++++-------------- pyproject.toml | 3 +- tests/test_aggregate.py | 26 +++ 3 files changed, 65 insertions(+), 141 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/aggregate.py b/openeo_processes_dask/process_implementations/cubes/aggregate.py index a0c3a2af..5bc28b4f 100644 --- a/openeo_processes_dask/process_implementations/cubes/aggregate.py +++ b/openeo_processes_dask/process_implementations/cubes/aggregate.py @@ -22,24 +22,11 @@ TooManyDimensions, ) -DEFAULT_CRS = "EPSG:4326" - - __all__ = ["aggregate_temporal", "aggregate_temporal_period", "aggregate_spatial"] logger = logging.getLogger(__name__) -def geometry_mask(geoms, geobox, all_touched=False, invert=False): - return rasterio.features.geometry_mask( - [geom.to_crs(geobox.crs) for geom in geoms], - out_shape=geobox.shape, - transform=geobox.affine, - all_touched=all_touched, - invert=invert, - ) - - def aggregate_temporal( data: RasterCube, intervals: Union[TemporalIntervals, list[TemporalInterval], list[Optional[str]]], @@ -124,140 +111,50 @@ def aggregate_temporal_period( ) -def _aggregate_geometry( - data: RasterCube, - geom, - transform, - reducer: Callable, -): - data_dims = list(data.dims) - y_dim = data.openeo.y_dim - x_dim = data.openeo.x_dim - t_dim = data.openeo.temporal_dims - t_dim = None if len(t_dim) == 0 else t_dim[0] - b_dim = data.openeo.band_dims - b_dim = None if len(b_dim) == 0 else b_dim[0] - - y_dim_size = data.sizes[y_dim] - x_dim_size = data.sizes[x_dim] - - # Create a GeoSeries from the geometry - geo_series = gpd.GeoSeries(geom) - - # Convert the GeoSeries to a GeometryArray - geometry_array = geo_series.geometry.array - - mask = rasterio.features.geometry_mask( - geometry_array, out_shape=(y_dim_size, x_dim_size), transform=transform - ) - - if t_dim is not None: - mask = np.expand_dims(mask, axis=data_dims.index(t_dim)) - if b_dim is not None: - mask = np.expand_dims(mask, axis=data_dims.index(b_dim)) - - masked_data = data * mask - del mask, data - gc.collect() - - positional_parameters = {"data": 0} - - stat_within_polygon = masked_data.reduce( - reducer, - axis=(data_dims.index(y_dim), data_dims.index(x_dim)), - keep_attrs=True, - ignore_nodata=True, - positional_parameters=positional_parameters, - ) - result = stat_within_polygon.values - - del masked_data, stat_within_polygon - gc.collect() - - return result.T - - def aggregate_spatial( data: RasterCube, geometries, reducer: Callable, chunk_size: int = 2, ) -> VectorCube: - t_dim = data.openeo.temporal_dims - t_dim = None if len(t_dim) == 0 else t_dim[0] - b_dim = data.openeo.band_dims - b_dim = None if len(b_dim) == 0 else b_dim[0] - - if "type" in geometries and geometries["type"] == "FeatureCollection": - gdf = gpd.GeoDataFrame.from_features(geometries, DEFAULT_CRS) - elif "type" in geometries and geometries["type"] in ["Polygon"]: - polygon = shapely.geometry.Polygon(geometries["coordinates"][0]) - gdf = gpd.GeoDataFrame(geometry=[polygon]) - gdf.crs = DEFAULT_CRS - - transform = data.rio.transform() - geometries = gdf.geometry.values - - geometry_chunks = [ - geometries[i : i + chunk_size] for i in range(0, len(geometries), chunk_size) - ] - - computed_results = [] - logger.info(f"Running aggregate_spatial process") - try: - for i, chunk in enumerate(geometry_chunks): - # Create a list of delayed objects for the current chunk - chunk_results = Parallel(n_jobs=-1)( - delayed(_aggregate_geometry)( - data, geom, transform=transform, reducer=reducer - ) - for geom in chunk + x_dim = data.openeo.x_dim + y_dim = data.openeo.y_dim + DEFAULT_CRS = "EPSG:4326" + + if isinstance(geometries, str): + # Allow importing geometries from url (e.g. github raw) + import json + from urllib.request import urlopen + + response = urlopen(geometries) + geometries = json.loads(response.read()) + if isinstance(geometries, dict): + # Get crs from geometries + if "features" in geometries: + for feature in geometries["features"]: + if "properties" not in feature: + feature["properties"] = {} + elif feature["properties"] is None: + feature["properties"] = {} + DEFAULT_CRS = ( + geometries.get("crs", {}).get("properties", {}).get("name", DEFAULT_CRS) ) - computed_results.extend(chunk_results) - except: - logger.debug(f"Running process failed at {(i+1) *2} geometry") - - logger.info(f"Finish aggregate_spatial process for {len(geometries)}") - - final_results = np.stack(computed_results) - del chunk_results, geometry_chunks, computed_results - gc.collect() + if "type" in geometries and geometries["type"] == "FeatureCollection": + gdf = gpd.GeoDataFrame.from_features(geometries, crs=DEFAULT_CRS) + elif "type" in geometries and geometries["type"] in ["Polygon"]: + polygon = shapely.geometry.Polygon(geometries["coordinates"][0]) + gdf = gpd.GeoDataFrame(geometry=[polygon]) + gdf.crs = DEFAULT_CRS - df = pd.DataFrame() - keys_items = {} - - for idx, b in enumerate(data[b_dim].values): - columns = [] - if t_dim: - for t in range(len(data[t_dim])): - columns.append(f"{b}_time{t+1}") - aggregated_data = final_results[:, idx, :] - else: - columns.append(f"{b}") - aggregated_data = final_results[:, idx] - - keys_items[b] = columns - - # Create a new DataFrame with the current data and columns - band_df = pd.DataFrame(aggregated_data, columns=columns) - # Concatenate the new DataFrame with the existing DataFrame - df = pd.concat([df, band_df], axis=1) - - df = gpd.GeoDataFrame(df, geometry=gdf.geometry) - - data_vars = {} - for key in keys_items.keys(): - data_vars[key] = (["geometry", t_dim], df[keys_items[key]]) - - ## Create VectorCube - if t_dim: - times = list(data[t_dim].values) - vec_cube = xr.Dataset( - data_vars=data_vars, coords={"geometry": df.geometry, t_dim: times} - ).xvec.set_geom_indexes("geometry", crs=df.crs) - else: - vec_cube = xr.Dataset( - data_vars=data_vars, coords=dict(geometry=df.geometry) - ).xvec.set_geom_indexes("geometry", crs=df.crs) + geometries = gdf.geometry.values + positional_parameters = {"data": 0} + vec_cube = data.xvec.zonal_stats( + geometries, + x_coords=x_dim, + y_coords=y_dim, + method="iterate", + stats=reducer, + positional_parameters=positional_parameters, + ) return vec_cube diff --git a/pyproject.toml b/pyproject.toml index bec6ac97..d87bed30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ pystac_client = { version = ">=0.6.1", optional = true } planetary_computer = { version = ">=0.5.1", optional = true } scipy = "^1.11.3" xvec = { version = ">=0.1.0", optional = true } +joblib = { version = ">=1.3.2", optional = true } [tool.poetry.group.dev.dependencies] pytest = "^7.2.0" @@ -50,7 +51,7 @@ pre-commit = "^2.20.0" pytest-cov = "^4.0.0" [tool.poetry.extras] -implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator", "xvec"] +implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator", "xvec", "joblib"] ml = ["xgboost"] [tool.pytest.ini_options] diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 1ee894ad..b1d7c8d2 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -1,5 +1,6 @@ from functools import partial +import geopandas as gpd import numpy as np import pytest from openeo_pg_parser_networkx.pg_schema import ParameterReference, TemporalInterval @@ -135,3 +136,28 @@ def test_aggregate_spatial( ) assert len(output_cube.dims) < len(reduced_cube.dims) + + gdf = gpd.GeoDataFrame.from_features(polygon_geometry_small, crs="EPSG:4326") + xmin, ymin, xmax, ymax = gdf.total_bounds + + expected_values = ( + reduced_cube.sel(x=slice(xmin, xmax), y=slice(ymin, ymax)) + .mean(["x", "y"]) + .values + ) + + assert (output_cube.values == expected_values).all() + + geometry_url = "https://raw.githubusercontent.com/ValentinaHutter/polygons/master/polygons_small.json" + output_cube = aggregate_spatial( + data=reduced_cube, geometries=geometry_url, reducer=reducer + ) + + assert len(output_cube.geometry) == 38 + + geometry = {"type": "Polygon", "coordinates": [[[0, 0], [0, 1], [1, 1], [1, 0]]]} + output_cube = aggregate_spatial( + data=reduced_cube, geometries=geometry, reducer=reducer + ) + + assert np.isnan(output_cube.values).all() From 50786cc3fe7e2dbcf3882b8e6b8a0dcf6b0e3c56 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Mon, 12 Feb 2024 16:03:28 +0100 Subject: [PATCH 23/48] update version (#225) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d87bed30..002a3a6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.2.1" +version = "2024.2.2" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 365ae22df919892da81abf8c03db731c624586aa Mon Sep 17 00:00:00 2001 From: Michele Claus <31700619+clausmichele@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:23:00 +0100 Subject: [PATCH 24/48] labelled arrays (#155) * Draft for labelled arrays * Logic update * Added label test --------- Co-authored-by: Sean <63349506+SerRichard@users.noreply.github.com> --- .../process_implementations/arrays.py | 15 ++++++++++++--- .../process_implementations/core.py | 8 +++++++- .../process_implementations/cubes/reduce.py | 7 ++++--- tests/test_arrays.py | 18 ++++++++++++++++++ 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/openeo_processes_dask/process_implementations/arrays.py b/openeo_processes_dask/process_implementations/arrays.py index d86860ea..ebbaeaf8 100644 --- a/openeo_processes_dask/process_implementations/arrays.py +++ b/openeo_processes_dask/process_implementations/arrays.py @@ -7,6 +7,7 @@ import pandas as pd import xarray as xr from numpy.typing import ArrayLike +from openeo_pg_parser_networkx.pg_schema import DateTime from xarray.core.duck_array_ops import isnull, notnull from openeo_processes_dask.process_implementations.cubes.utils import _is_dask_array @@ -43,6 +44,8 @@ def array_element( label: Optional[str] = None, return_nodata: Optional[bool] = False, axis=None, + context=None, + dim_labels=None, ): if index is None and label is None: raise ArrayElementParameterMissing( @@ -55,14 +58,20 @@ def array_element( ) if label is not None: - raise NotImplementedError( - "labelled arrays are currently not implemented. Please use index instead." - ) + if isinstance(label, DateTime): + label = label.to_numpy() + (index,) = np.where(dim_labels == label) + if len(index) == 0: + index = None + else: + index = index[0] try: if index is not None: element = np.take(data, index, axis=axis) return element + else: + raise IndexError except IndexError: if return_nodata: logger.warning( diff --git a/openeo_processes_dask/process_implementations/core.py b/openeo_processes_dask/process_implementations/core.py index e3fa7ce7..bc87ee22 100644 --- a/openeo_processes_dask/process_implementations/core.py +++ b/openeo_processes_dask/process_implementations/core.py @@ -68,7 +68,13 @@ def wrapper( else: resolved_kwargs[k] = arg - special_args = ["axis", "keepdims", "source_transposed_axis", "context"] + special_args = [ + "axis", + "keepdims", + "source_transposed_axis", + "context", + "dim_labels", + ] # Remove 'axis' and keepdims parameter if not expected in function signature. for arg in special_args: if arg not in inspect.signature(f).parameters: diff --git a/openeo_processes_dask/process_implementations/cubes/reduce.py b/openeo_processes_dask/process_implementations/cubes/reduce.py index 5641edc4..7702ea06 100644 --- a/openeo_processes_dask/process_implementations/cubes/reduce.py +++ b/openeo_processes_dask/process_implementations/cubes/reduce.py @@ -21,15 +21,16 @@ def reduce_dimension( f"Provided dimension ({dimension}) not found in data.dims: {data.dims}" ) - positional_parameters = {"data": 0} - named_parameters = {"context": context} + dim_labels = data[dimension].values + positional_parameters = {"data": 0} reduced_data = data.reduce( reducer, dim=dimension, keep_attrs=True, positional_parameters=positional_parameters, - named_parameters=named_parameters, + context=context, + dim_labels=dim_labels, ) # Preset diff --git a/tests/test_arrays.py b/tests/test_arrays.py index 5c9eea64..3d760214 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -48,6 +48,24 @@ def test_array_element( xr.testing.assert_equal(output_cube, input_cube.isel({"bands": 1}, drop=True)) + # Use a label + _process = partial( + process_registry["array_element"].implementation, + label="B02", + data=ParameterReference(from_parameter="data"), + ) + + output_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="bands") + + general_output_checks( + input_cube=input_cube, + output_cube=output_cube, + verify_attrs=False, + verify_crs=True, + ) + + xr.testing.assert_equal(output_cube, input_cube.loc[{"bands": "B02"}].drop("bands")) + # When the index is out of range, we expect an ArrayElementNotAvailable exception to be thrown _process_not_available = partial( process_registry["array_element"].implementation, From 503d879574548ef14a12152b7afbef791eae2afa Mon Sep 17 00:00:00 2001 From: Sean <63349506+SerRichard@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:23:38 +0100 Subject: [PATCH 25/48] Bug in apply kernel (#231) * bump submodule to include new process specs * pre commit hooks * fix hard coded spatial dimension reference --------- Co-authored-by: sean --- openeo_processes_dask/process_implementations/cubes/apply.py | 2 +- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openeo_processes_dask/process_implementations/cubes/apply.py b/openeo_processes_dask/process_implementations/cubes/apply.py index aa425687..1c59706b 100644 --- a/openeo_processes_dask/process_implementations/cubes/apply.py +++ b/openeo_processes_dask/process_implementations/cubes/apply.py @@ -107,7 +107,7 @@ def apply_kernel( ) def convolve(data, kernel, mode="constant", cval=0, fill_value=0): - dims = ("y", "x") + dims = data.openeo.spatial_dims convolved = lambda data: scipy.ndimage.convolve( data, kernel, mode=mode, cval=cval ) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 12444d8b..16a0ec6f 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 12444d8bbc1a983e6b8df0ba956d057dc2d2224e +Subproject commit 16a0ec6f3a256ffd5044cc6d3c712ce9fd92ff73 diff --git a/pyproject.toml b/pyproject.toml index 002a3a6a..c75d954a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.2.2" +version = "2024.2.3" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From 9fcafef12f8c6cc6da302a5f02a870b8fa12c278 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:57:30 +0100 Subject: [PATCH 26/48] Update specs to use datacube instead of rastercube (#232) * update specs * version 2024.2.4 --- openeo_processes_dask/specs/openeo-processes | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/specs/openeo-processes b/openeo_processes_dask/specs/openeo-processes index 16a0ec6f..ea3d202a 160000 --- a/openeo_processes_dask/specs/openeo-processes +++ b/openeo_processes_dask/specs/openeo-processes @@ -1 +1 @@ -Subproject commit 16a0ec6f3a256ffd5044cc6d3c712ce9fd92ff73 +Subproject commit ea3d202a43e966c8524b524efc1a283e3b15a2df diff --git a/pyproject.toml b/pyproject.toml index c75d954a..ac3c38ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.2.3" +version = "2024.2.4" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From ed369062f498ac2d27f86204d022891e8059d4ae Mon Sep 17 00:00:00 2001 From: Michele Claus <31700619+clausmichele@users.noreply.github.com> Date: Thu, 29 Feb 2024 11:42:01 +0100 Subject: [PATCH 27/48] Fix resample spatial (#233) fix ressample_spatial dims --- .pre-commit-config.yaml | 2 ++ .../process_implementations/cubes/resample.py | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7b9a79b8..52e68ec4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,7 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks +default_language_version: + python: python3 repos: - repo: https://github.com/asottile/pyupgrade rev: v3.10.1 diff --git a/openeo_processes_dask/process_implementations/cubes/resample.py b/openeo_processes_dask/process_implementations/cubes/resample.py index 1211cd97..d6037e3f 100644 --- a/openeo_processes_dask/process_implementations/cubes/resample.py +++ b/openeo_processes_dask/process_implementations/cubes/resample.py @@ -75,11 +75,11 @@ def resample_spatial( how=projection, resolution=resolution, resampling=method ) - if "longitude" in reprojected.dims and "x" in data.dims: - reprojected = reprojected.rename({"longitude": "x"}) + if reprojected.openeo.x_dim != data.openeo.x_dim: + reprojected = reprojected.rename({reprojected.openeo.x_dim: data.openeo.x_dim}) - if "latitude" in reprojected.dims and "y" in data.dims: - reprojected = reprojected.rename({"latitude": "y"}) + if reprojected.openeo.y_dim != data.openeo.y_dim: + reprojected = reprojected.rename({reprojected.openeo.y_dim: data.openeo.y_dim}) reprojected.attrs["crs"] = data_cp.rio.crs From 218d6a6275f17fb6cf096ef63e7d82e8076d2168 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Thu, 29 Feb 2024 11:46:46 +0100 Subject: [PATCH 28/48] Update pyproject.toml (#234) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ac3c38ff..6c37ca34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openeo-processes-dask" -version = "2024.2.4" +version = "2024.2.5" description = "Python implementations of many OpenEO processes, dask-friendly by default." authors = ["Lukas Weidenholzer ", "Sean Hoyal ", "Valentina Hutter "] maintainers = ["EODC Staff "] From cc68a07d691e071461526b2bf1224daa5ac228ad Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:42:07 +0100 Subject: [PATCH 29/48] add docs for aggregate_spatial (#230) * add docs for aggregate_spatial * Update aggregate-large-spatial-extents.md --- docs/scalability/README.md | 3 + .../aggregate-large-spatial-extents.md | 70 ++++++++++++++++++ docs/scalability/data.png | Bin 0 -> 44664 bytes docs/scalability/data_crash.png | Bin 0 -> 9958 bytes 4 files changed, 73 insertions(+) create mode 100644 docs/scalability/README.md create mode 100644 docs/scalability/aggregate-large-spatial-extents.md create mode 100644 docs/scalability/data.png create mode 100644 docs/scalability/data_crash.png diff --git a/docs/scalability/README.md b/docs/scalability/README.md new file mode 100644 index 00000000..754ec889 --- /dev/null +++ b/docs/scalability/README.md @@ -0,0 +1,3 @@ +# Document issues with scalability + +Edge-cases, which cannot be handled within `openeo-processes-dask`, but want to be documented. \ No newline at end of file diff --git a/docs/scalability/aggregate-large-spatial-extents.md b/docs/scalability/aggregate-large-spatial-extents.md new file mode 100644 index 00000000..cbf65416 --- /dev/null +++ b/docs/scalability/aggregate-large-spatial-extents.md @@ -0,0 +1,70 @@ +# Aggregate data over large spatial extents + +Date: 2024-02-26 + +## Context + +https://github.com/Open-EO/openeo-processes-dask/issues/124 + +In a recent use case, the `aggregate_spatial` process was applied to both Sentinel 1 and Sentinel 2 data to generate `vector-cubes` with polygons from [here](https://github.com/openEOPlatform/SRR3_notebooks/blob/main/notebooks/resources/UC8/vector_data/target_canopy_cover_60m_WGS84/target_canopy_cover_WGS84_60m.geojson). As `process graphs` are executed node after node, we would first use `load_collection` to load the data over the total bounds of the polygons and hand the data to `aggregate_spatial` afterwards. +With the total bounds of all polygons being set to `{'west': 3, 'east': 18, 'south': 43, 'north': 51}`, loading the data led to the following situations: + +- We were able to load the data, for short temporal extents: +
+ Datacube +
Figure 1: Lazy dask array for Sentinel 1 datacube. Note the size of data: 3.73 TiB
+
+ +- When increasing the temporal interval, the amount of data became too much for dask and the corresponding error was raised. +
+ Errormessage +
+ +It is not trivial to solve this in dask, as +- this means that one chunk cannot handle the amount of data, that is supposed to be in the chunk. +- setting a smaller chunk size, means more and more chunks are generated and the dask task graph can easily become too large itself. +- increasing the dask memory might solve the problem for one specific dataset, but it might occur again, if a different dataset with a higher spatial or temporal resolution is used afterwards. + +This means, that running through the `process graph` raises an error, before `aggregate_spatial` can be executed. + +## Possible solutions + +There might be more than one solution to this. + +Here is a first attempt on how we were trying to solve this. + +In `openeo-processes-dask`, there is a general implementation of `aggregate_spatial` available, that can be run, as long as the input data can be read by dask and the error described above does not occur. If the error is raised, the execution of the processes or the process implementations might need adaptions. + +### Handle data loading within aggregate_spatial + +Here is a pseudo-code for another `aggregate_spatial` implementation: + + +