From f851cc9f4205931275e0f181e693492959165484 Mon Sep 17 00:00:00 2001 From: terrtia Date: Tue, 9 Jan 2024 11:19:01 +0100 Subject: [PATCH] fix: [queue] save last timout in cache --- bin/core/Sync_module.py | 10 ++++++++-- bin/lib/ail_queues.py | 7 +++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py index 065be8f3..6dc89672 100755 --- a/bin/core/Sync_module.py +++ b/bin/core/Sync_module.py @@ -22,7 +22,7 @@ # Import Project packages ################################## from core import ail_2_ail -from lib.ail_queues import get_processed_end_obj, timeout_processed_objs +from lib.ail_queues import get_processed_end_obj, timeout_processed_objs, get_last_queue_timeout from lib.exceptions import ModuleQueueError from lib.objects import ail_objects from modules.abstract_module import AbstractModule @@ -41,6 +41,7 @@ def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() self.last_refresh = time.time() + self.last_refresh_queues = time.time() print(self.dict_sync_queues) @@ -83,7 +84,12 @@ def run(self): while self.proceed: # Timeout queues - timeout_processed_objs() + # timeout_processed_objs() + if self.last_refresh_queues < time.time(): + timeout_processed_objs() + self.last_refresh_queues = time.time() + 120 + self.redis_logger.debug('Timeout queues') + # print('Timeout queues') # Get one message (paste) from the QueueIn (copy of Redis_Global publish) global_id = get_processed_end_obj() diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index b4218a07..451fcc47 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -250,6 +250,12 @@ def rename_processed_obj(new_id, old_id): r_obj_process.srem(f'objs:process', old_id) add_processed_obj(new_id, x_hash, module=module) +def get_last_queue_timeout(): + epoch_update = r_obj_process.get('queue:obj:timeout:last') + if not epoch_update: + epoch_update = 0 + return float(epoch_update) + def timeout_process_obj(obj_global_id): for q in get_processed_obj_queues(obj_global_id): queue, x_hash = q.split(':', 1) @@ -272,6 +278,7 @@ def timeout_processed_objs(): for obj_type in ail_core.get_obj_queued(): for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit): timeout_process_obj(obj_global_id) + r_obj_process.set('queue:obj:timeout:last', time.time()) def delete_processed_obj(obj_global_id): for q in get_processed_obj_queues(obj_global_id):