Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinaHutter committed Nov 10, 2023
2 parents 4178cb3 + c09c805 commit 45b1b0a
Show file tree
Hide file tree
Showing 18 changed files with 407 additions and 28 deletions.
7 changes: 7 additions & 0 deletions openeo_processes_dask/process_implementations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
"Did not load machine learning processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, ml]`"
)

try:
from .experimental import *
except ImportError as e:
logger.warning(
"Did not experimental processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, experimental]`"
)

import rioxarray as rio # Required for the .rio accessor on xarrays.

import openeo_processes_dask.process_implementations.cubes._xr_interop
9 changes: 5 additions & 4 deletions openeo_processes_dask/process_implementations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ def wrapper(
resolved_kwargs.pop(arg, None)

pretty_args = {k: repr(v)[:80] for k, v in resolved_kwargs.items()}
logger.info(f"Running process {f.__name__}")
logger.debug(
f"Running process {f.__name__} with resolved parameters: {pretty_args}"
)
if hasattr(f, "__name__"):
logger.info(f"Running process {f.__name__}")
logger.debug(
f"Running process {f.__name__} with resolved parameters: {pretty_args}"
)

return f(*resolved_args, **resolved_kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
from .general import *
from .indices import *
from .load import *
from .mask_polygon import *
from .merge import *
from .reduce import *
38 changes: 37 additions & 1 deletion openeo_processes_dask/process_implementations/cubes/_filter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import json
import logging
import warnings
from typing import Callable

import dask.array as da
import geopandas as gpd
import numpy as np
import pyproj
import rasterio
import rioxarray
import shapely
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval

from openeo_processes_dask.process_implementations.cubes.mask_polygon import (
mask_polygon,
)
from openeo_processes_dask.process_implementations.data_model import RasterCube
from openeo_processes_dask.process_implementations.exceptions import (
BandFilterParameterMissing,
Expand All @@ -16,9 +24,18 @@
TooManyDimensions,
)

DEFAULT_CRS = "EPSG:4326"


logger = logging.getLogger(__name__)

__all__ = ["filter_labels", "filter_temporal", "filter_bands", "filter_bbox"]
__all__ = [
"filter_labels",
"filter_temporal",
"filter_bands",
"filter_bbox",
"filter_spatial",
]


def filter_temporal(
Expand Down Expand Up @@ -102,6 +119,25 @@ def filter_bands(data: RasterCube, bands: list[str] = None) -> RasterCube:
return data


def filter_spatial(data: RasterCube, geometries) -> RasterCube:
if "type" in geometries and geometries["type"] == "FeatureCollection":
gdf = gpd.GeoDataFrame.from_features(geometries, DEFAULT_CRS)
elif "type" in geometries and geometries["type"] in ["Polygon"]:
polygon = shapely.geometry.Polygon(geometries["coordinates"][0])
gdf = gpd.GeoDataFrame(geometry=[polygon])
gdf.crs = DEFAULT_CRS

bbox = gdf.total_bounds
spatial_extent = BoundingBox(
west=bbox[0], east=bbox[2], south=bbox[1], north=bbox[3]
)

data = filter_bbox(data, spatial_extent)
data = mask_polygon(data, geometries)

return data


def filter_bbox(data: RasterCube, extent: BoundingBox) -> RasterCube:
try:
input_crs = str(data.rio.crs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def drop_dimension(data: RasterCube, name: str) -> RasterCube:
raise DimensionLabelCountMismatch(
f"The number of dimension labels exceeds one, which requires a reducer. Dimension ({name}) has {len(data[name])} labels."
)
return data.drop_vars(name).squeeze()
return data.drop_vars(name).squeeze(name)


def create_raster_cube() -> RasterCube:
Expand Down
165 changes: 165 additions & 0 deletions openeo_processes_dask/process_implementations/cubes/mask_polygon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
import logging
from typing import Any, Union

import dask.array as da
import geopandas as gpd
import numpy as np
import rasterio
import rioxarray
import shapely
import xarray as xr
from xarray.core import dtypes

from openeo_processes_dask.process_implementations.data_model import (
RasterCube,
VectorCube,
)

DEFAULT_CRS = "EPSG:4326"


logger = logging.getLogger(__name__)

__all__ = [
"mask_polygon",
]


def mask_polygon(
data: RasterCube,
mask: Union[VectorCube, str],
replacement: Any = dtypes.NA,
inside: bool = True,
) -> RasterCube:
y_dim = data.openeo.y_dim
x_dim = data.openeo.x_dim
t_dim = data.openeo.temporal_dims
b_dim = data.openeo.band_dims

if len(t_dim) == 0:
t_dim = None
else:
t_dim = t_dim[0]
if len(b_dim) == 0:
b_dim = None
else:
b_dim = b_dim[0]

y_dim_size = data.sizes[y_dim]
x_dim_size = data.sizes[x_dim]

# Reproject vector data to match the raster data cube.
## Get the CRS of data cube
try:
data_crs = str(data.rio.crs)
except Exception as e:
raise Exception(f"Not possible to estimate the input data projection! {e}")

data = data.rio.set_crs(data_crs)

## Reproject vector data if the input vector data is Polygon or Multi Polygon
if "type" in mask and mask["type"] == "FeatureCollection":
geometries = gpd.GeoDataFrame.from_features(mask, DEFAULT_CRS)
geometries = geometries.to_crs(data_crs)
geometries = geometries.to_json()
geometries = json.loads(geometries)
elif "type" in mask and mask["type"] in ["Polygon"]:
polygon = shapely.geometry.Polygon(mask["coordinates"][0])
geometries = gpd.GeoDataFrame(geometry=[polygon])
geometries.crs = DEFAULT_CRS
geometries = geometries.to_crs(data_crs)
geometries = geometries.to_json()
geometries = json.loads(geometries)
else:
raise ValueError(
"Unsupported or missing geometry type. Expected 'Polygon' or 'FeatureCollection'."
)

data_dims = list(data.dims)

# Get the Affine transformer
transform = data.rio.transform()

# Initialize an empty mask
# Set the same chunk size as the input data
data_chunks = {}
chunks_shapes = data.chunks
for i, d in enumerate(data_dims):
data_chunks[d] = chunks_shapes[i][0]

if data_dims.index(x_dim[0]) < data_dims.index(y_dim[0]):
final_mask = da.zeros(
(x_dim_size, y_dim_size),
chunks={x_dim: data_chunks[x_dim], y_dim: data_chunks[y_dim]},
dtype=bool,
)

dask_out_shape = da.from_array(
(x_dim_size, y_dim_size),
chunks={x_dim: data_chunks[x_dim], y_dim: data_chunks[y_dim]},
)
else:
final_mask = da.zeros(
(y_dim_size, x_dim_size),
chunks={y_dim: data_chunks[y_dim], x_dim: data_chunks[x_dim]},
dtype=bool,
)

dask_out_shape = da.from_array(
(y_dim_size, x_dim_size),
chunks={y_dim: data_chunks[y_dim], x_dim: data_chunks[x_dim]},
)

# CHECK IF the input single polygon or multiple Polygons
if "type" in geometries and geometries["type"] == "FeatureCollection":
for feature in geometries["features"]:
polygon = shapely.geometry.Polygon(feature["geometry"]["coordinates"][0])

# Create a GeoSeries from the geometry
geo_series = gpd.GeoSeries(polygon)

# Convert the GeoSeries to a GeometryArray
geometry_array = geo_series.geometry.array

mask = da.map_blocks(
rasterio.features.geometry_mask,
geometry_array,
transform=transform,
out_shape=dask_out_shape,
dtype=bool,
invert=inside,
)
final_mask |= mask

elif "type" in geometries and geometries["type"] in ["Polygon"]:
polygon = shapely.geometry.Polygon(geometries["coordinates"][0])
geo_series = gpd.GeoSeries(polygon)

# Convert the GeoSeries to a GeometryArray
geometry_array = geo_series.geometry.array
mask = da.map_blocks(
rasterio.features.geometry_mask,
geometry_array,
transform=transform,
out_shape=dask_out_shape,
dtype=bool,
invert=inside,
)
final_mask |= mask

masked_dims = len(final_mask.shape)

diff_axes = []
for axis in range(len(data_dims)):
try:
if final_mask.shape[axis] != data.shape[axis]:
diff_axes.append(axis)
except:
if len(diff_axes) < (len(data_dims) - 2):
diff_axes.append(axis)

final_mask = np.expand_dims(final_mask, axis=diff_axes)
filtered_ds = data.where(final_mask, other=replacement)

return filtered_ds
17 changes: 8 additions & 9 deletions openeo_processes_dask/process_implementations/cubes/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,26 @@ def merge_cubes(
previous_dim_order = list(cube1.dims) + [
dim for dim in cube2.dims if dim not in cube1.dims
]

band_dim1 = cube1.openeo.band_dims[0]
band_dim2 = cube2.openeo.band_dims[0]
if len(cube1.openeo.band_dims) > 0 or len(cube2.openeo.band_dims) > 0:
# Same reordering issue mentioned above
previous_band_order = list(
cube1[cube1.openeo.band_dims[0]].values
) + [
previous_band_order = list(cube1[band_dim1].values) + [
band
for band in list(cube2[cube2.openeo.band_dims[0]].values)
if band not in list(cube1[cube1.openeo.band_dims[0]].values)
if band not in list(cube1[band_dim1].values)
]
cube1 = cube1.to_dataset(cube1.openeo.band_dims[0])
cube2 = cube2.to_dataset(cube2.openeo.band_dims[0])
cube1 = cube1.to_dataset(band_dim1)
cube2 = cube2.to_dataset(band_dim2)

# compat="override" to deal with potentially conflicting coords
# see https://github.com/Open-EO/openeo-processes-dask/pull/148 for context
merged_cube = xr.combine_by_coords(
[cube1, cube2], combine_attrs="drop_conflicts", compat="override"
)
if isinstance(merged_cube, xr.Dataset):
merged_cube = merged_cube.to_array(dim="bands")
merged_cube = merged_cube.reindex({"bands": previous_band_order})
merged_cube = merged_cube.to_array(dim=band_dim1)
merged_cube = merged_cube.reindex({band_dim1: previous_band_order})

merged_cube = merged_cube.transpose(*previous_dim_order)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ def reduce_spatial(
spatial_dims = data.openeo.spatial_dims if data.openeo.spatial_dims else None
return data.reduce(
reducer,
dimension=spatial_dims,
dim=spatial_dims,
keep_attrs=True,
context=context,
positional_parameters=positional_parameters,
named_parameters=named_parameters,
)
6 changes: 3 additions & 3 deletions openeo_processes_dask/process_implementations/math.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,21 @@ def median(data, ignore_nodata=True, axis=None, keepdims=False):
return np.median(data, axis=axis, keepdims=keepdims)


def mean(data, ignore_nodata=False, axis=None, keepdims=False):
def mean(data, ignore_nodata=True, axis=None, keepdims=False):
if ignore_nodata:
return np.nanmean(data, axis=axis, keepdims=keepdims)
else:
return np.mean(data, axis=axis, keepdims=keepdims)


def sd(data, ignore_nodata=False, axis=None, keepdims=False):
def sd(data, ignore_nodata=True, axis=None, keepdims=False):
if ignore_nodata:
return np.nanstd(data, axis=axis, ddof=1, keepdims=keepdims)
else:
return np.std(data, axis=axis, ddof=1, keepdims=keepdims)


def variance(data, ignore_nodata=False, axis=None, keepdims=False):
def variance(data, ignore_nodata=True, axis=None, keepdims=False):
if ignore_nodata:
return np.nanvar(data, axis=axis, ddof=1, keepdims=keepdims)
else:
Expand Down
Loading

0 comments on commit 45b1b0a

Please sign in to comment.