Skip to content

Commit

Permalink
Merge pull request #3233 from uktrade/feat/more-locking-around-tasks
Browse files Browse the repository at this point in the history
feat: more locking around background tasks
  • Loading branch information
michalc authored Jul 20, 2024
2 parents bf562df + 24b742c commit 407587f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
26 changes: 26 additions & 0 deletions dataworkspace/dataworkspace/apps/applications/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ def application_instance_max_cpu(application_instance):
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def kill_idle_fargate():
try:
with cache.lock("do_kill_idle_fargate", blocking_timeout=0, timeout=3600):
do_kill_idle_fargate()
except redis.exceptions.LockError as e:
logger.warning("Failed to acquire lock for do_kill_idle_fargate: %s", e)


def do_kill_idle_fargate():
logger.info("kill_idle_fargate: Start")

two_hours_ago = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=-2)
Expand Down Expand Up @@ -397,6 +405,14 @@ def kill_idle_fargate():
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def populate_created_stopped_fargate():
try:
with cache.lock("populate_created_stopped_fargate", blocking_timeout=0, timeout=3600):
do_populate_created_stopped_fargate()
except redis.exceptions.LockError as e:
logger.warning("Failed to acquire lock for populate_created_stopped_fargate: %s", e)


def do_populate_created_stopped_fargate():
logger.info("populate_created_stopped_fargate: Start")

# This is used to populate spawner_created_at and spawner_stopped_at for
Expand Down Expand Up @@ -1505,6 +1521,16 @@ def long_running_query_alert():
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def push_tool_monitoring_dashboard_datasets():
try:
with cache.lock(
"push_tool_monitoring_dashboard_datasets", blocking_timeout=0, timeout=3600
):
do_push_tool_monitoring_dashboard_datasets()
except redis.exceptions.LockError as e:
logger.warning("Failed to acquire lock for push_tool_monitoring_dashboard_datasets: %s", e)


def do_push_tool_monitoring_dashboard_datasets():
geckboard_api_key = os.environ["GECKOBOARD_API_KEY"]
cluster = os.environ["APPLICATION_SPAWNER_OPTIONS__FARGATE__VISUALISATION__CLUSTER_NAME"]
task_role_prefix = os.environ["APPLICATION_TEMPLATES__1__SPAWNER_OPTIONS__ROLE_PREFIX"]
Expand Down
28 changes: 28 additions & 0 deletions dataworkspace/dataworkspace/apps/datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,18 @@ def store_sql_query(visualisation_link, data_set_id, table_id, sql_query):
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def link_superset_visualisations_to_related_datasets():
try:
with cache.lock(
"link_superset_visualisations_to_related_datasets", blocking_timeout=0, timeout=3600
):
do_link_superset_visualisations_to_related_datasets()
except LockError as e:
logger.warning(
"Failed to acquire lock for link_superset_visualisations_to_related_datasets: %s", e
)


def do_link_superset_visualisations_to_related_datasets():
api_url = os.environ["SUPERSET_ROOT"] + "/api/v1/%s"

login_response = requests.post(
Expand Down Expand Up @@ -729,6 +741,14 @@ def get_change_item(related_object, change_id):
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def update_metadata_with_source_table_id():
try:
with cache.lock("update_metadata_with_source_table_id", blocking_timeout=0, timeout=3600):
do_update_metadata_with_source_table_id()
except LockError as e:
logger.warning("Failed to acquire lock for update_metadata_with_source_table_id: %s", e)


def do_update_metadata_with_source_table_id():
database_name = list(settings.DATABASES_DATA.items())[0][0]
with connections[database_name].cursor() as cursor:
cursor.execute(
Expand Down Expand Up @@ -857,6 +877,14 @@ def do_store_custom_dataset_query_metadata():
@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def store_reference_dataset_metadata():
try:
with cache.lock("store_reference_dataset_metadata", blocking_timeout=0, timeout=3600):
do_store_reference_dataset_metadata()
except LockError as e:
logger.warning("Failed to acquire lock for store_reference_dataset_metadata: %s", e)


def do_store_reference_dataset_metadata():
for reference_dataset in ReferenceDataset.objects.live().filter(published=True):
logger.info(
"Checking for metadata update for reference dataset '%s'", reference_dataset.name
Expand Down

0 comments on commit 407587f

Please sign in to comment.