Skip to content

Commit

Permalink
v2.3.10
Browse files Browse the repository at this point in the history
  • Loading branch information
hlynursmari1 committed Mar 14, 2019
1 parent 4a51922 commit e46bce4
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 28 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
**2.3.10** - *released 2019-03-14*

* Use sessions to improve connection efficiency in API calls.
* Clear critical path in asmaster file handler. This helps the listener thread consume the stream faster by a few orders of magnitude.
* Minor logging of pending result handling added.
* Some extra profiling for asmaster handlers.

**2.3.9** - *released 2018-12-11*

* Improve *asmaster* listener to allow simpler configuration of custom result handlers.
Expand Down
2 changes: 1 addition & 1 deletion ricloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.3.9'
__version__ = '2.3.10'
16 changes: 8 additions & 8 deletions ricloud/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(self):
self._pending_tasks = {}
self.services = {}

self.session = requests.Session()

@property
def pending_tasks(self):
return self._pending_tasks
Expand Down Expand Up @@ -165,24 +167,22 @@ def _parse_response(response, post_request=False):

return data

@staticmethod
def _perform_get_request(url, headers=None):
response = requests.get(
def _perform_get_request(self, url, headers=None):
response = self.session.get(
url,
headers=headers
)

return Api._parse_response(response)
return self._parse_response(response)

@staticmethod
def _perform_post_request(url, data, headers=None):
response = requests.post(
def _perform_post_request(self, url, data, headers=None):
response = self.session.post(
url,
data=data,
headers=headers
)

return Api._parse_response(response, post_request=True)
return self._parse_response(response, post_request=True)


class Task(object):
Expand Down
29 changes: 21 additions & 8 deletions ricloud/asmaster_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class AsmasterHandler(RiCloudHandler):
TABLE = None
QUERY_TEMPLATE = ''

@utils.profile('asmaster handler took', to_log=True)
def on_complete_message(self, header, stream):
task = AsmasterTask(header.get('task_id', 'system'), callback=self.generate_callback())
task.headers = header
Expand All @@ -83,6 +84,8 @@ class AsmasterSystemHandler(AsmasterHandler):
"""

def generate_callback(self):

@utils.profile('system callback took', to_log=True)
def callback(task):
query = self.QUERY_TEMPLATE.format(table=self.TABLE)

Expand Down Expand Up @@ -115,6 +118,8 @@ class AsmasterFeedHandler(AsmasterHandler):
"""

def generate_callback(self):

@utils.profile('feed callback took', to_log=True)
def callback(task):
query = self.QUERY_TEMPLATE.format(table=self.TABLE)

Expand Down Expand Up @@ -150,20 +155,28 @@ class AsmasterDownloadFileHandler(AsmasterHandler):
)
"""

@utils.profile('file handler took', to_log=True)
def on_complete_message(self, header, stream):
task = AsmasterTask(header.get('task_id'), callback=self.generate_callback())
task = AsmasterTask(header.get('task_id', 'system'), callback=self.generate_callback())
task.headers = header

target_path = self.get_target_path(task.headers)

file_path = utils.save_file_stream_to_target_path(stream, target_path)

task.result = file_path
task.result = stream

self.api.append_consumed_task(task)

# Tell the `handle` function not to close the stream file.
return True

def generate_callback(self):

@utils.profile('file callback took', to_log=True)
def callback(task):
target_path = self.get_target_path(task.headers)

file_path = utils.save_file_stream_to_target_path(task.result, target_path)

# Close the temp file here as we did not let `handle` do so above.
task.result.close()

file_id = task.headers['file_id']

if len(file_id) > 4096:
Expand All @@ -177,7 +190,7 @@ def callback(task):
"device_id": task.headers.get('device_id', None),
"device_tag": task.headers.get('device_tag', None),
"headers": json.dumps(task.headers),
"location": task.result,
"location": file_path,
"file_id": file_id,
}

Expand Down
8 changes: 6 additions & 2 deletions ricloud/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def handle(self, header, payload):

if header['chunk'] == header['total_chunks']: # Message should be complete.
stream.seek(0)
self.on_complete_message(header, stream)
stream.close()

keep_stream = self.on_complete_message(header, stream)

# The handler's `on_complete_message` has taken responsibility for closing the stream.
if not keep_stream:
stream.close()

self.temporary_files.delete(header['task_id']) # Remove the stream handle.

Expand Down
17 changes: 14 additions & 3 deletions ricloud/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(self):

self._last_check = time.time()

def __len__(self):
return len(self.handles)

def get(self, handle_id):
if handle_id not in self.handles:
self.handles[handle_id] = {
Expand All @@ -77,7 +80,13 @@ def check_expiries(self):
if (self._last_check + self.CHECK_INTERVAL) > time_now:
return

logger.info('Checking temporary file expiries.')
if self.handles:
open_handles = [
handle_id for handle_id, handle in self.handles.iteritems()
if not handle['handle'].closed
]

logger.info('Open temp file IDs: %s', ', '.join(open_handles))

self._last_check = time_now

Expand All @@ -88,10 +97,12 @@ def check_expiries(self):
else:
break # Iterate until the first handle that is not expired.

logger.info('Found %d expired files to remove.', len(to_delete))
if to_delete:
logger.info('Found %d expired files to remove.', len(to_delete))

for handle_id in to_delete:
self.handles[handle_id]['handle'].close() # Nothing should be expecting this, so close it.
if not self.handles[handle_id]['handle'].closed:
self.handles[handle_id]['handle'].close() # Nothing should be expecting this, so close it.
del self.handles[handle_id]

def _is_expired(self, timestamp):
Expand Down
4 changes: 2 additions & 2 deletions ricloud/ricloud.ini
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ level = WARNING
object_store_greenlets = 50

[tempfiles]
timeout = 21600
timeout_check_interval = 600
timeout = 600
timeout_check_interval = 60
16 changes: 13 additions & 3 deletions ricloud/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import json
import shutil
import logging
import requests
from multiprocessing.pool import ThreadPool

Expand All @@ -14,6 +15,9 @@
from .samples import get_samples


logger = logging.getLogger(__name__)


def get_or_create_filepath(filename, directory=''):
absolute_dir = os.path.join(OUTPUT_DIR, directory)

Expand Down Expand Up @@ -161,16 +165,22 @@ def select_samples(response, service_name, payload):
return selected_sample(response, payload)


def profile(timer_text=''):
def profile(timer_text='', to_log=False):
def inner(func):
def wraps(*args, **kwargs):
timer_start = time.time()
ret = func(*args, **kwargs)
timer_end = time.time()
if settings.getboolean('logging', 'time_profile'):
delta = timer_end - timer_start
message = "{text} {delta:.2f}s".format(text=timer_text, delta=delta)
puts(colored.magenta(message))

if to_log:
message = "{text} {delta:.9f}s".format(text=timer_text, delta=delta)
logger.debug(message)
else:
message = "{text} {delta:.2f}s".format(text=timer_text, delta=delta)
puts(colored.magenta(message))

return ret
return wraps
return inner
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.3.9
current_version = 2.3.10
commit = true
message = v{new_version}
tag = true
Expand Down

0 comments on commit e46bce4

Please sign in to comment.