Skip to content

Commit

Permalink
moved lock in job manager to class attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorVerhaert committed Nov 29, 2024
1 parent 185dbec commit 81f09d6
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,5 @@ cython_debug/

tests/test_openeo_gfmap/results/
notebook/
data/
data/
tests/tests_integration/results/*
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies = [
"geojson>=3.0.0",
"geopandas",
"h5netcdf>=1.2.0",
"openeo",
"openeo<=0.35",
"onnxruntime",
"cftime",
"pyarrow",
Expand Down
15 changes: 6 additions & 9 deletions src/openeo_gfmap/manager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
from openeo_gfmap.manager import _log
from openeo_gfmap.stac import constants

# Lock to use when writing to the STAC collection
_stac_lock = Lock()


def retry_on_exception(max_retries: int, delay_s: int = 180):
"""Decorator to retry a function if an exception occurs.
Expand Down Expand Up @@ -132,6 +129,8 @@ def __init__(
self._catalogue_cache = output_dir / "catalogue_cache.bin"

self.stac = stac
if stac is not None:
self.lock = Lock()
self.stac_enabled = stac_enabled
self.collection_id = collection_id
self.collection_description = collection_description
Expand Down Expand Up @@ -258,7 +257,7 @@ def _resume_postjob_actions(self, df: pd.DataFrame):
"Resuming postprocessing of job %s, queueing on_job_finished...",
row.id,
)
future = self._executor.submit(self.on_job_done, job, row, _stac_lock)
future = self._executor.submit(self.on_job_done, job, row)
future.add_done_callback(
partial(
done_callback,
Expand Down Expand Up @@ -327,7 +326,7 @@ def _update_statuses(self, df: pd.DataFrame):
"Job %s finished successfully, queueing on_job_done...", job.job_id
)
job_status = "postprocessing"
future = self._executor.submit(self.on_job_done, job, row, _stac_lock)
future = self._executor.submit(self.on_job_done, job, row)
# Future will setup the status to finished when the job is done
future.add_done_callback(
partial(
Expand Down Expand Up @@ -416,9 +415,7 @@ def on_job_error(self, job: BatchJob, row: pd.Series):
)

@retry_on_exception(max_retries=2, delay_s=30)
def on_job_done(
self, job: BatchJob, row: pd.Series, lock: Lock
): # pylint: disable=arguments-differ
def on_job_done(self, job: BatchJob, row: pd.Series):
"""Method called when a job finishes successfully. It will first download the results of
the job and then call the `post_job_action` method.
"""
Expand Down Expand Up @@ -491,7 +488,7 @@ def on_job_done(
_log.info("Adding %s items to the STAC collection...", len(job_items))

if self.stac_enabled:
with lock:
with self.lock:
self._update_stac(job.job_id, job_items)

_log.info("Job %s and post job action finished successfully.", job.job_id)
Expand Down

0 comments on commit 81f09d6

Please sign in to comment.