Skip to content

Commit

Permalink
v2.3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
hlynursmari1 committed Jan 3, 2018
1 parent 7be67b1 commit c128304
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 46 deletions.
9 changes: 6 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ jobs:
- checkout
- run:
name: system dependencies
command: sudo apt-get install -y build-essential libmysqlclient-dev libxml2-dev
command: sudo apt-get install -y mysql-client
- run:
name: test dependencies
command: sudo pip install tox
- run:
name: test
command: tox
- run: mv reports/htmlcov $CIRCLE_ARTIFACTS
- run: mv reports $CIRCLE_TEST_REPORTS/reports
- store_test_results:
path: reports
- store_artifacts:
path: reports
destination: reports
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
**2.3.2** - *released 2018-01-03*

* Improved the robustness of the new worker thread when in *asmaster* listener mode. Extra logging around errors here.
* Extra debug logging around the *asmaster* listener to aid in issue diagnosis.
* Complete setting up CircleCI testing.
* Some nice badges in the README.

**2.3.1** - *released 2018-01-02*

* Make the *asmaster* listener similar in design to the interactive *asapi* mode by splitting out the stream consumer to a different thread.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ricloud-py: iCloud access made easy

[![pypi](https://img.shields.io/pypi/v/ricloud.svg)](https://pypi.python.org/pypi/ricloud)
[![CircleCI](https://circleci.com/gh/reincubate/ricloud.svg?style=shield)](https://circleci.com/gh/reincubate/ricloud)
[![docs](https://img.shields.io/badge/docs-ricloud-blue.svg)](https://docs.reincubate.com/ricloud/)

This is the sample Python client for *ricloud*, Reincubate's [Cloud Data API](https://www.reincubate.com/ricloud-cloud-data-api/?utm_source=github&utm_medium=ricloud-py&utm_campaign=ricloud).

> Refer to the comprehensive *ricloud* [documentation](https://docs.reincubate.com/ricloud/?utm_source=github&utm_medium=ricloud-py&utm_campaign=ricloud) for a fuller picture of the API's capabilities, specifications, and benefits. See also the [Apple iCloud service](https://docs.reincubate.com/ricloud/icloud-backups/?utm_source=github&utm_medium=ricloud-py&utm_campaign=ricloud) documentation for its rich capabilities in accessing iCloud data.
Expand Down
2 changes: 1 addition & 1 deletion ricloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.3.1'
__version__ = '2.3.2'
9 changes: 8 additions & 1 deletion ricloud/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@
from __future__ import unicode_literals

import json
import logging
from docopt import docopt

from . import conf
from . import conf # Load configuration explicitly.
from .ricloud import RiCloud
from .asmaster_api import AsmasterApi
from .utils import select_service, select_samples


logger = logging.getLogger(__name__)


def _parse_input_arguments(arguments):
timeout = int(arguments.get('--timeout') or 600)

Expand Down Expand Up @@ -100,6 +104,7 @@ def main():
application_payload = _parse_input_arguments(arguments)

if application_payload['mode'] == 'interactive':
logger.debug('Starting ricloud client in interactive mode.')
# Main RiCloud object
ricloud = RiCloud()

Expand All @@ -114,13 +119,15 @@ def main():
Application.run()

elif application_payload['mode'] == 'listen':
logger.debug('Starting ricloud client in listener mode.')
# asmaster listener requires MySQLdb, and we want to avoid requiring that otherwise
from .asmaster_listener import AsmasterListener

# This sets the listener running and will not return
AsmasterListener(application_payload['timeout'])

elif application_payload['mode'] == 'manager':
logger.debug('Starting ricloud client manager request.')
api = AsmasterApi(application_payload['timeout'])
api.setup()

Expand Down
17 changes: 12 additions & 5 deletions ricloud/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from .conf import settings


logger = logging.getLogger(__name__)


class Api(object):
"""Primary object that pushes requests into a distinct stream thread."""

Expand Down Expand Up @@ -111,7 +114,7 @@ def task_status(self, task_id):

def result_consumed(self, task_id):
"""Report the result as successfully consumed."""
logging.debug('API Sending result consumed message.')
logger.debug('Sending result consumed message.')
data = {
'task_ids': task_id,
}
Expand All @@ -137,15 +140,17 @@ def append_consumed_task(self, task):
self._consumed_tasks.append(task)

def listen(self):
try:
while True:
while True:
try:
if self._consumed_tasks:
consumed_task = self._consumed_tasks.pop(0)
consumed_task.trigger_callback()
else:
time.sleep(0.1)
except KeyboardInterrupt:
pass
except KeyboardInterrupt:
break
except: # noqa: E722 want the listener to survive.
logger.error('Error occurred in task result processing callback.', exc_info=True)

@staticmethod
def _parse_response(response, post_request=False):
Expand Down Expand Up @@ -216,6 +221,8 @@ def result(self, value):
self.trigger_callback()

def trigger_callback(self):
logger.debug('Triggering callback for task %s', str(self.uuid))

if self.callback:
self.callback(self)

Expand Down
40 changes: 23 additions & 17 deletions ricloud/asmaster_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from . import utils


logger = logging.getLogger(__name__)

database_handler = DatabaseHandler()


Expand All @@ -38,6 +40,7 @@ def __init__(self, timeout, api=None):
class AsmasterHandler(RiCloudHandler):
TYPE = ''
TABLE = None
QUERY_TEMPLATE = ''

def on_complete_message(self, header, stream):
task = AsmasterTask(header.get('task_id'), callback=self.generate_callback())
Expand All @@ -50,13 +53,14 @@ def on_complete_message(self, header, stream):
class AsmasterSystemHandler(AsmasterHandler):
TYPE = 'system'
TABLE = 'system'
QUERY_TEMPLATE = """
INSERT INTO {table} (`received`, `headers`, `body`, `message`, `code`)
VALUES (NOW(), %(headers)s, %(body)s, %(message)s, %(code)s)
"""

def generate_callback(self):
def callback(task):
query = """
INSERT INTO {table} (`received`, `headers`, `body`, `message`, `code`)
VALUES (NOW(), %(headers)s, %(body)s, %(message)s, %(code)s)
""".format(table=self.TABLE)
query = self.QUERY_TEMPLATE.format(table=self.TABLE)

parsed_body = json.loads(task.result)

Expand All @@ -73,21 +77,22 @@ def callback(task):
database_handler.handle_query(query, args)

if "error" in task.result:
logging.error('System error message: %s', task.result)
logger.error('System error message: %s', task.result)

return callback


class AsmasterFeedHandler(AsmasterHandler):
TYPE = 'fetch-data'
TABLE = 'feed'
QUERY_TEMPLATE = """
INSERT INTO {table} (`service`, `received`, `account_id`, `device_id`, `device_tag`, `headers`, `body`)
VALUES (%(service)s, NOW(), %(account_id)s, %(device_id)s, %(device_tag)s, %(headers)s, %(body)s)
"""

def generate_callback(self):
def callback(task):
query = """
INSERT INTO {table} (`service`, `received`, `account_id`, `device_id`, `device_tag`, `headers`, `body`)
VALUES (%(service)s, NOW(), %(account_id)s, %(device_id)s, %(device_tag)s, %(headers)s, %(body)s)
""".format(table=self.TABLE)
query = self.QUERY_TEMPLATE.format(table=self.TABLE)

args = {
'service': task.headers['service'],
Expand All @@ -112,6 +117,14 @@ class AsmasterMessageHandler(AsmasterFeedHandler):
class AsmasterDownloadFileHandler(AsmasterHandler):
TYPE = 'download-file'
TABLE = 'file'
QUERY_TEMPLATE = """
INSERT INTO {table}
(`service`, `received`, `account_id`, `device_id`, `device_tag`, `headers`, `location`, `file_id`)
VALUES (
%(service)s, NOW(), %(account_id)s, %(device_id)s,
%(device_tag)s, %(headers)s, %(location)s, %(file_id)s
)
"""

def on_complete_message(self, header, stream):
task = AsmasterTask(header.get('task_id'), callback=self.generate_callback())
Expand All @@ -132,14 +145,7 @@ def callback(task):
if len(file_id) > 4096:
raise StreamError("Invalid download file request, file_id is too long")

query = """
INSERT INTO {table}
(`service`, `received`, `account_id`, `device_id`, `device_tag`, `headers`, `location`, `file_id`)
VALUES (
%(service)s, NOW(), %(account_id)s, %(device_id)s,
%(device_tag)s, %(headers)s, %(location)s, %(file_id)s
)
""".format(table=self.TABLE)
query = self.QUERY_TEMPLATE.format(table=self.TABLE)

args = {
"service": task.headers['service'],
Expand Down
15 changes: 9 additions & 6 deletions ricloud/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from cStringIO import StringIO


logger = logging.getLogger(__name__)


class StreamError(Exception):

def __init__(self, message):
Expand All @@ -24,9 +27,9 @@ class SystemHandler(HandlerBase):
TYPE = 'system'

def handle(self, header, payload):
logging.info('-*- System Message -*-')
logging.info('Header: %s', header)
logging.info('Paylaod: %s', payload)
logger.info('-*- System Message -*-')
logger.info('Header: %s', header)
logger.info('Paylaod: %s', payload)


class ChunkedDataHandler(HandlerBase):
Expand Down Expand Up @@ -56,9 +59,9 @@ def _get_stream(self, id):
return self.tmp_handles[id]

def on_complete_message(self, header, stream):
logging.info('-*- message completed -*-')
logging.info(header)
logging.info(stream.read())
logger.info('-*- message completed -*-')
logger.info(header)
logger.info(stream.read())


class LogInHandler(ChunkedDataHandler):
Expand Down
8 changes: 7 additions & 1 deletion ricloud/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
from . import conf


logger = logging.getLogger(__name__)


class DatabaseHandler(object):

def __init__(self, db_name=conf.LISTENER_DB_NAME):
logger.debug('Setting up DatabaseHandler for database `%s`.', db_name)
self.db_name = db_name

_db_con = None

@property
def db_con(self):
if not self._db_con:
logger.debug('Establishing new database connection to database `%s`.', self.db_name)
self._db_con = MySQLdb.connect(
host=conf.LISTENER_DB_HOST,
port=int(conf.LISTENER_DB_PORT),
Expand All @@ -30,8 +35,9 @@ def handle_query(self, query, args=None, retry=2):
self.db_con.commit()
except (AttributeError, MySQLdb.OperationalError):
if not retry:
logger.error('Query failed, no retries remaining.', exc_info=True)
raise

logging.warn('asmaster listener query failed, attempting to refresh database connection.')
logger.warn('Query failed, attempting to refresh connection (%d retries remaining)', retry)
self._db_con = None
self.handle_query(query, args=args, retry=retry - 1)
17 changes: 10 additions & 7 deletions ricloud/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@
from .handlers import StreamError


logger = logging.getLogger(__name__)


class Listener(object):
"""Handle stream messagse here."""
def __init__(self, handlers):
self.handlers = handlers

def on_heartbeat(self):
logging.debug('-*- Heartbeat -*-')
logger.debug('-*- Heartbeat -*-')

def on_message(self, header, data):
header = json.loads(header)
handler = self.handlers.get(header['type'], self.handlers.get('__ALL__', None))

if not handler:
logging.error('Unknown message type', header['type'])
logging.error(header)
logging.error(data)
logger.error('Unknown message type', header['type'])
logger.error(header)
logger.error(data)
else:
try:
handler.handle(header, data)
except StreamError as e:
logging.error("StreamError: %s", e.message)
logger.error("StreamError: %s", e.message)
except Exception as e:
logging.critical("%s: %s", type(e), e.args)
logging.critical(traceback.format_exc())
logger.critical("%s: %s", type(e), e.args)
logger.critical(traceback.format_exc())
11 changes: 7 additions & 4 deletions ricloud/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from requests.packages import urllib3


logger = logging.getLogger(__name__)


class Stream(object):
def __init__(self, host, url, listener, stream, token, protocol='https'):
self.protocol = protocol
Expand All @@ -24,7 +27,7 @@ def go(self):
try:
self._go(url)
except (requests.exceptions.ConnectionError, urllib3.exceptions.ProtocolError):
logging.error('Connection failed unexpectedly with error.', exc_info=True)
logger.error('Connection failed unexpectedly with error.', exc_info=True)

self._retry_wait()

Expand Down Expand Up @@ -59,19 +62,19 @@ def _consume(self, response):

buf.readline() # Always have a final carriage return at the end of a message.

logging.warn('Connection closed, final message: %s', buf.read())
logger.warn('Connection closed, final message: %s', buf.read())

def _process_initial_response(self, response):
response.raise_for_status()

logging.debug('Stream connection established.')
logger.debug('Stream connection established.')
self.streaming = True
self.retry_count = 0

def _retry_wait(self):
self.retry_count += 1
retry_wait = min(max((self.retry_count - 2), 0) ** 2, 60) # Increase quadratically to a maximum of 60s.

logging.warning('Attempting reconnect number %d in %d seconds.', self.retry_count, retry_wait)
logger.warn('Attempting reconnect number %d in %d seconds.', self.retry_count, retry_wait)

time.sleep(retry_wait)
5 changes: 4 additions & 1 deletion tests/test_asmaster_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def setup(self):
test.write(line.replace("ricloud", self.DB_NAME))

subprocess.check_output(
"mysql -u {} -p{} < {}".format(conf.LISTENER_DB_USER, conf.LISTENER_DB_PASSWORD, test_schema_path),
"mysql -u {} -p{} -h {} -P {} < {}".format(
conf.LISTENER_DB_USER, conf.LISTENER_DB_PASSWORD,
conf.LISTENER_DB_HOST, conf.LISTENER_DB_PORT, test_schema_path
),
shell=True
)
os.remove(test_schema_path)
Expand Down

0 comments on commit c128304

Please sign in to comment.