Skip to content

Commit

Permalink
Add per request timeout support.
Browse files Browse the repository at this point in the history
  • Loading branch information
mayfield committed Sep 10, 2015
1 parent 30f138a commit b42406d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
## [Unreleased] - unreleased


## [1.2.0] - 2015-09-10
### Added
- Support for per request timeouts via {get,post,put,...}(..., timeout=10)


## [1.1.0] - 2015-09-09
### Changed
- xml serializer support
Expand All @@ -19,6 +24,7 @@
- First stable release


[unreleased]: https://github.com/mayfield/syndicate/compare/v1.1.0...HEAD
[unreleased]: https://github.com/mayfield/syndicate/compare/v1.2.0...HEAD
[1.2.0]: https://github.com/mayfield/syndicate/compare/v1.1.0...v1.2.0
[1.1.0]: https://github.com/mayfield/syndicate/compare/v1.0.0...v1.1.0
[1.0.0]: https://github.com/mayfield/syndicate/compare/b9ec552eb9967c5622053c33b0b0a4789a16ffab...v1.0.0
1 change: 0 additions & 1 deletion syndicate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@
)

Service = syndicate.client.Service
Resource = syndicate.client.Resource
70 changes: 26 additions & 44 deletions syndicate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def ingress_filter(self, response):
data.meta = self.meta_getter(response)
return data

def do(self, method, path, urn=None, callback=None, data=None, **query):
def do(self, method, path, urn=None, callback=None, data=None,
timeout=None, **query):
urlparts = [self.uri, self.urn if urn is None else urn]
urlparts.extend(path)
url = '/'.join(filter(None, (x.strip('/') for x in urlparts)))
Expand All @@ -104,63 +105,63 @@ def do(self, method, path, urn=None, callback=None, data=None, **query):
parts[0] += '/'
url = ''.join(parts)
return self.adapter.request(method, url, callback=callback, data=data,
query=query)
query=query, timeout=timeout)

def get(self, *path, **query):
return self.do('get', path, **query)
def get(self, *path, **kwargs):
return self.do('get', path, **kwargs)

def get_pager(self, *path, **query):
def get_pager(self, *path, **kwargs):
""" A generator for all the results a resource can provide. The pages
are lazily loaded. """
fn = self.get_pager_async if self.async else self.get_pager_sync
page_arg = query.pop('page_size', None)
limit_arg = query.pop('limit', None)
query['limit'] = page_arg or limit_arg or self.default_page_size
return fn(path=path, query=query)
page_arg = kwargs.pop('page_size', None)
limit_arg = kwargs.pop('limit', None)
kwargs['limit'] = page_arg or limit_arg or self.default_page_size
return fn(path=path, kwargs=kwargs)

def get_pager_sync(self, path=None, query=None):
page = self.get(*path, **query)
def get_pager_sync(self, path=None, kwargs=None):
page = self.get(*path, **kwargs)
for x in page:
yield x
while page.meta['next']:
page = self.get(urn=page.meta['next'])
for x in page:
yield x

def get_pager_async(self, path=None, query=None):
return AsyncPager(getter=self.get, path=path, query=query)
def get_pager_async(self, path=None, kwargs=None):
return AsyncPager(getter=self.get, path=path, kwargs=kwargs)

def post(self, *path_and_data, **query):
def post(self, *path_and_data, **kwargs):
path = list(path_and_data)
data = path.pop(-1)
return self.do('post', path, data=data, **query)
return self.do('post', path, data=data, **kwargs)

def delete(self, *path, **query):
data = query.pop('data', None)
return self.do('delete', path, data=data, **query)
def delete(self, *path, **kwargs):
data = kwargs.pop('data', None)
return self.do('delete', path, data=data, **kwargs)

def put(self, *path_and_data, **query):
def put(self, *path_and_data, **kwargs):
path = list(path_and_data)
data = path.pop(-1)
return self.do('put', path, data=data, **query)
return self.do('put', path, data=data, **kwargs)

def patch(self, *path_and_data, **query):
def patch(self, *path_and_data, **kwargs):
path = list(path_and_data)
data = path.pop(-1)
return self.do('patch', path, data=data, **query)
return self.do('patch', path, data=data, **kwargs)


class AsyncPager(object):

max_overflow = 1000

def __init__(self, getter=None, path=None, query=None):
def __init__(self, getter=None, path=None, kwargs=None):
self.mark = 0
self.active = None
self.waiting = collections.deque()
self.getter = getter
self.path = path
self.query = query
self.kwargs = kwargs
self.stop = False
self.next_page = None

Expand All @@ -178,7 +179,7 @@ def queue_next_page(self):
if self.next_page:
self.active = self.getter(urn=self.next_page)
else:
self.active = self.getter(*self.path, **self.query)
self.active = self.getter(*self.path, **self.kwargs)
self.active.add_done_callback(self.on_next_page)

def queue_next(self, item):
Expand Down Expand Up @@ -211,22 +212,3 @@ def on_next_page(self, page):
self.waiting.popleft().set_exception(StopIteration())
else:
self.queue_next_page()


class Resource(dict):
""" Extended dictionary format that makes future operations on this object
more object-mapper like. """

def do(self, method, *path, **query):
""" XXX: Debatable existence. """
raise NotImplementedError()

def fetch(self, method, *path, **query):
""" Get a subresource. """
raise NotImplementedError()

def save(self):
raise NotImplementedError()

def delete(self, method, *path, **query):
raise NotImplementedError()

0 comments on commit b42406d

Please sign in to comment.