Skip to content

Commit

Permalink
fix: [queue] save last timout in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Terrtia committed Jan 9, 2024
1 parent bd2ca4b commit f851cc9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
10 changes: 8 additions & 2 deletions bin/core/Sync_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# Import Project packages
##################################
from core import ail_2_ail
from lib.ail_queues import get_processed_end_obj, timeout_processed_objs
from lib.ail_queues import get_processed_end_obj, timeout_processed_objs, get_last_queue_timeout
from lib.exceptions import ModuleQueueError
from lib.objects import ail_objects
from modules.abstract_module import AbstractModule
Expand All @@ -41,6 +41,7 @@ def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE

self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict()
self.last_refresh = time.time()
self.last_refresh_queues = time.time()

print(self.dict_sync_queues)

Expand Down Expand Up @@ -83,7 +84,12 @@ def run(self):
while self.proceed:

# Timeout queues
timeout_processed_objs()
# timeout_processed_objs()
if self.last_refresh_queues < time.time():
timeout_processed_objs()
self.last_refresh_queues = time.time() + 120
self.redis_logger.debug('Timeout queues')
# print('Timeout queues')

# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
global_id = get_processed_end_obj()
Expand Down
7 changes: 7 additions & 0 deletions bin/lib/ail_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ def rename_processed_obj(new_id, old_id):
r_obj_process.srem(f'objs:process', old_id)
add_processed_obj(new_id, x_hash, module=module)

def get_last_queue_timeout():
epoch_update = r_obj_process.get('queue:obj:timeout:last')
if not epoch_update:
epoch_update = 0
return float(epoch_update)

def timeout_process_obj(obj_global_id):
for q in get_processed_obj_queues(obj_global_id):
queue, x_hash = q.split(':', 1)
Expand All @@ -272,6 +278,7 @@ def timeout_processed_objs():
for obj_type in ail_core.get_obj_queued():
for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit):
timeout_process_obj(obj_global_id)
r_obj_process.set('queue:obj:timeout:last', time.time())

def delete_processed_obj(obj_global_id):
for q in get_processed_obj_queues(obj_global_id):
Expand Down

0 comments on commit f851cc9

Please sign in to comment.