From e46bce4529fbdca34a4190c18c7219e937e2b697 Mon Sep 17 00:00:00 2001 From: hlynursmari1 Date: Thu, 14 Mar 2019 17:19:07 +0000 Subject: [PATCH] v2.3.10 --- CHANGELOG.md | 7 +++++++ ricloud/__init__.py | 2 +- ricloud/api.py | 16 ++++++++-------- ricloud/asmaster_listener.py | 29 +++++++++++++++++++++-------- ricloud/handlers.py | 8 ++++++-- ricloud/helpers.py | 17 ++++++++++++++--- ricloud/ricloud.ini | 4 ++-- ricloud/utils.py | 16 +++++++++++++--- setup.cfg | 2 +- 9 files changed, 73 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08e4fd0..bbf6910 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +**2.3.10** - *released 2019-03-14* + +* Use sessions to improve connection efficiency in API calls. +* Clear critical path in asmaster file handler. This helps the listener thread consume the stream faster by a few orders of magnitude. +* Minor logging of pending result handling added. +* Some extra profiling for asmaster handlers. + **2.3.9** - *released 2018-12-11* * Improve *asmaster* listener to allow simpler configuration of custom result handlers. diff --git a/ricloud/__init__.py b/ricloud/__init__.py index b9fdf8c..65efe2e 100644 --- a/ricloud/__init__.py +++ b/ricloud/__init__.py @@ -1 +1 @@ -__version__ = '2.3.9' +__version__ = '2.3.10' diff --git a/ricloud/api.py b/ricloud/api.py index 0427c91..5fec380 100755 --- a/ricloud/api.py +++ b/ricloud/api.py @@ -28,6 +28,8 @@ def __init__(self): self._pending_tasks = {} self.services = {} + self.session = requests.Session() + @property def pending_tasks(self): return self._pending_tasks @@ -165,24 +167,22 @@ def _parse_response(response, post_request=False): return data - @staticmethod - def _perform_get_request(url, headers=None): - response = requests.get( + def _perform_get_request(self, url, headers=None): + response = self.session.get( url, headers=headers ) - return Api._parse_response(response) + return self._parse_response(response) - @staticmethod - def _perform_post_request(url, data, headers=None): - response = requests.post( + def _perform_post_request(self, url, data, headers=None): + response = self.session.post( url, data=data, headers=headers ) - return Api._parse_response(response, post_request=True) + return self._parse_response(response, post_request=True) class Task(object): diff --git a/ricloud/asmaster_listener.py b/ricloud/asmaster_listener.py index 976e7e0..afac73e 100644 --- a/ricloud/asmaster_listener.py +++ b/ricloud/asmaster_listener.py @@ -66,6 +66,7 @@ class AsmasterHandler(RiCloudHandler): TABLE = None QUERY_TEMPLATE = '' + @utils.profile('asmaster handler took', to_log=True) def on_complete_message(self, header, stream): task = AsmasterTask(header.get('task_id', 'system'), callback=self.generate_callback()) task.headers = header @@ -83,6 +84,8 @@ class AsmasterSystemHandler(AsmasterHandler): """ def generate_callback(self): + + @utils.profile('system callback took', to_log=True) def callback(task): query = self.QUERY_TEMPLATE.format(table=self.TABLE) @@ -115,6 +118,8 @@ class AsmasterFeedHandler(AsmasterHandler): """ def generate_callback(self): + + @utils.profile('feed callback took', to_log=True) def callback(task): query = self.QUERY_TEMPLATE.format(table=self.TABLE) @@ -150,20 +155,28 @@ class AsmasterDownloadFileHandler(AsmasterHandler): ) """ + @utils.profile('file handler took', to_log=True) def on_complete_message(self, header, stream): - task = AsmasterTask(header.get('task_id'), callback=self.generate_callback()) + task = AsmasterTask(header.get('task_id', 'system'), callback=self.generate_callback()) task.headers = header - - target_path = self.get_target_path(task.headers) - - file_path = utils.save_file_stream_to_target_path(stream, target_path) - - task.result = file_path + task.result = stream self.api.append_consumed_task(task) + # Tell the `handle` function not to close the stream file. + return True + def generate_callback(self): + + @utils.profile('file callback took', to_log=True) def callback(task): + target_path = self.get_target_path(task.headers) + + file_path = utils.save_file_stream_to_target_path(task.result, target_path) + + # Close the temp file here as we did not let `handle` do so above. + task.result.close() + file_id = task.headers['file_id'] if len(file_id) > 4096: @@ -177,7 +190,7 @@ def callback(task): "device_id": task.headers.get('device_id', None), "device_tag": task.headers.get('device_tag', None), "headers": json.dumps(task.headers), - "location": task.result, + "location": file_path, "file_id": file_id, } diff --git a/ricloud/handlers.py b/ricloud/handlers.py index 6074294..665b70d 100644 --- a/ricloud/handlers.py +++ b/ricloud/handlers.py @@ -45,8 +45,12 @@ def handle(self, header, payload): if header['chunk'] == header['total_chunks']: # Message should be complete. stream.seek(0) - self.on_complete_message(header, stream) - stream.close() + + keep_stream = self.on_complete_message(header, stream) + + # The handler's `on_complete_message` has taken responsibility for closing the stream. + if not keep_stream: + stream.close() self.temporary_files.delete(header['task_id']) # Remove the stream handle. diff --git a/ricloud/helpers.py b/ricloud/helpers.py index 379131d..34504af 100644 --- a/ricloud/helpers.py +++ b/ricloud/helpers.py @@ -60,6 +60,9 @@ def __init__(self): self._last_check = time.time() + def __len__(self): + return len(self.handles) + def get(self, handle_id): if handle_id not in self.handles: self.handles[handle_id] = { @@ -77,7 +80,13 @@ def check_expiries(self): if (self._last_check + self.CHECK_INTERVAL) > time_now: return - logger.info('Checking temporary file expiries.') + if self.handles: + open_handles = [ + handle_id for handle_id, handle in self.handles.iteritems() + if not handle['handle'].closed + ] + + logger.info('Open temp file IDs: %s', ', '.join(open_handles)) self._last_check = time_now @@ -88,10 +97,12 @@ def check_expiries(self): else: break # Iterate until the first handle that is not expired. - logger.info('Found %d expired files to remove.', len(to_delete)) + if to_delete: + logger.info('Found %d expired files to remove.', len(to_delete)) for handle_id in to_delete: - self.handles[handle_id]['handle'].close() # Nothing should be expecting this, so close it. + if not self.handles[handle_id]['handle'].closed: + self.handles[handle_id]['handle'].close() # Nothing should be expecting this, so close it. del self.handles[handle_id] def _is_expired(self, timestamp): diff --git a/ricloud/ricloud.ini b/ricloud/ricloud.ini index c878df2..34f978e 100644 --- a/ricloud/ricloud.ini +++ b/ricloud/ricloud.ini @@ -50,5 +50,5 @@ level = WARNING object_store_greenlets = 50 [tempfiles] -timeout = 21600 -timeout_check_interval = 600 +timeout = 600 +timeout_check_interval = 60 diff --git a/ricloud/utils.py b/ricloud/utils.py index 19c394b..70ad5ab 100644 --- a/ricloud/utils.py +++ b/ricloud/utils.py @@ -5,6 +5,7 @@ import time import json import shutil +import logging import requests from multiprocessing.pool import ThreadPool @@ -14,6 +15,9 @@ from .samples import get_samples +logger = logging.getLogger(__name__) + + def get_or_create_filepath(filename, directory=''): absolute_dir = os.path.join(OUTPUT_DIR, directory) @@ -161,7 +165,7 @@ def select_samples(response, service_name, payload): return selected_sample(response, payload) -def profile(timer_text=''): +def profile(timer_text='', to_log=False): def inner(func): def wraps(*args, **kwargs): timer_start = time.time() @@ -169,8 +173,14 @@ def wraps(*args, **kwargs): timer_end = time.time() if settings.getboolean('logging', 'time_profile'): delta = timer_end - timer_start - message = "{text} {delta:.2f}s".format(text=timer_text, delta=delta) - puts(colored.magenta(message)) + + if to_log: + message = "{text} {delta:.9f}s".format(text=timer_text, delta=delta) + logger.debug(message) + else: + message = "{text} {delta:.2f}s".format(text=timer_text, delta=delta) + puts(colored.magenta(message)) + return ret return wraps return inner diff --git a/setup.cfg b/setup.cfg index 2517ac7..be14220 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.3.9 +current_version = 2.3.10 commit = true message = v{new_version} tag = true