From 81f09d6e6db618c146280ce39b4dfee52d9e2877 Mon Sep 17 00:00:00 2001 From: Victor Verhaert Date: Fri, 29 Nov 2024 16:25:41 +0100 Subject: [PATCH] moved lock in job manager to class attribute --- .gitignore | 3 ++- pyproject.toml | 2 +- src/openeo_gfmap/manager/job_manager.py | 15 ++++++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 6d012c8..5846c85 100644 --- a/.gitignore +++ b/.gitignore @@ -162,4 +162,5 @@ cython_debug/ tests/test_openeo_gfmap/results/ notebook/ -data/ \ No newline at end of file +data/ +tests/tests_integration/results/* diff --git a/pyproject.toml b/pyproject.toml index dd8c933..a9bbdb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ dependencies = [ "geojson>=3.0.0", "geopandas", "h5netcdf>=1.2.0", - "openeo", + "openeo<=0.35", "onnxruntime", "cftime", "pyarrow", diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py index 986bbf4..05bc2c0 100644 --- a/src/openeo_gfmap/manager/job_manager.py +++ b/src/openeo_gfmap/manager/job_manager.py @@ -17,9 +17,6 @@ from openeo_gfmap.manager import _log from openeo_gfmap.stac import constants -# Lock to use when writing to the STAC collection -_stac_lock = Lock() - def retry_on_exception(max_retries: int, delay_s: int = 180): """Decorator to retry a function if an exception occurs. @@ -132,6 +129,8 @@ def __init__( self._catalogue_cache = output_dir / "catalogue_cache.bin" self.stac = stac + if stac is not None: + self.lock = Lock() self.stac_enabled = stac_enabled self.collection_id = collection_id self.collection_description = collection_description @@ -258,7 +257,7 @@ def _resume_postjob_actions(self, df: pd.DataFrame): "Resuming postprocessing of job %s, queueing on_job_finished...", row.id, ) - future = self._executor.submit(self.on_job_done, job, row, _stac_lock) + future = self._executor.submit(self.on_job_done, job, row) future.add_done_callback( partial( done_callback, @@ -327,7 +326,7 @@ def _update_statuses(self, df: pd.DataFrame): "Job %s finished successfully, queueing on_job_done...", job.job_id ) job_status = "postprocessing" - future = self._executor.submit(self.on_job_done, job, row, _stac_lock) + future = self._executor.submit(self.on_job_done, job, row) # Future will setup the status to finished when the job is done future.add_done_callback( partial( @@ -416,9 +415,7 @@ def on_job_error(self, job: BatchJob, row: pd.Series): ) @retry_on_exception(max_retries=2, delay_s=30) - def on_job_done( - self, job: BatchJob, row: pd.Series, lock: Lock - ): # pylint: disable=arguments-differ + def on_job_done(self, job: BatchJob, row: pd.Series): """Method called when a job finishes successfully. It will first download the results of the job and then call the `post_job_action` method. """ @@ -491,7 +488,7 @@ def on_job_done( _log.info("Adding %s items to the STAC collection...", len(job_items)) if self.stac_enabled: - with lock: + with self.lock: self._update_stac(job.job_id, job_items) _log.info("Job %s and post job action finished successfully.", job.job_id)