Skip to content

Commit

Permalink
Merge pull request #9 from MeteoSwiss/fix/remove_multiprocessing
Browse files Browse the repository at this point in the history
fix: remove parralelization with multiprocessing
  • Loading branch information
matschaer authored Jan 15, 2024
2 parents e8d7d13 + ec9c8a1 commit ce4493c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 32 deletions.
3 changes: 2 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
History
=======

0.3.0 (2023-12-20)
0.3.0 (2024-01-15)
------------------

* Move from DataArray to Dataset for DEM object to allow transferring global attributes.
* Add units as variable attributes.
* Output slope in units of [degree] instead of [m / pixel].
* Fix bug in slope calculation.
* Remove parallelization of scales with multiprocessing for valley and ridge

0.2.1 (2022-10-19)
------------------
Expand Down
37 changes: 6 additions & 31 deletions topo_descriptors/topo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import dask.array as da
from numba import njit, prange
from scipy import ndimage, signal
from multiprocessing import Pool, cpu_count

import topo_descriptors.helpers as hlp
from topo_descriptors import CFG
Expand Down Expand Up @@ -371,44 +370,20 @@ def compute_valley_ridge(

scales_pxl, _ = hlp.scale_to_pixel(scales, dem_ds)
sigmas = hlp.get_sigmas(smth_factors, scales_pxl)
dem_val = hlp.get_da(dem_ds).values
units = "1"

pool = Pool(processes=min(len(scales_pxl), cpu_count()))
for idx, scale_pxl in enumerate(scales_pxl):
logger.info(
f"Computing scale {scales[idx]} meters with smoothing factor"
f" {smth_factors[idx]} ..."
)
names = _valley_ridge_names(scales[idx], mode, smth_factors[idx])
pool.apply_async(
_valley_ridge_wrap,
args=(
dem_ds,
scale_pxl,
mode,
flat_list,
sigmas[idx],
names,
ind_nans,
crop,
outdir,
),
)

pool.close()
pool.join()


def _valley_ridge_wrap(
dem_ds, size, mode, flat_list, sigma, names, ind_nans, crop, outdir
):
"""Wrapper to valley_ridge and hlp.to_netcdf functions to ease the parallelization
of the different scales"""
arrays = valley_ridge(dem_val, scale_pxl, mode, flat_list, sigmas[idx])

arrays = valley_ridge(hlp.get_da(dem_ds).values, size, mode, flat_list, sigma)
units = "1"
for array, name in zip(arrays, names):
array[ind_nans] = np.nan
hlp.to_netcdf(array, dem_ds, name, crop, outdir, units)
for array, name in zip(arrays, names):
array[ind_nans] = np.nan
hlp.to_netcdf(array, dem_ds, name, crop, outdir, units)


@hlp.timer
Expand Down

0 comments on commit ce4493c

Please sign in to comment.