forked from getsentry/raven-aiohttp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
raven_aiohttp.py
265 lines (206 loc) · 7.47 KB
/
raven_aiohttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""
raven_aiohttp
~~~~~~~~~~~~~
:copyright: (c) 2010-2015 by the Sentry Team, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
import abc
import asyncio
import socket
import aiohttp
from raven.conf import defaults
from raven.exceptions import APIError, RateLimited
from raven.transport.base import AsyncTransport
from raven.transport.http import HTTPTransport
try:
from asyncio import ensure_future
except ImportError:
ensure_future = getattr(asyncio, 'async')
try:
from raven.transport.base import has_newstyle_transports
except ImportError:
has_newstyle_transports = False
__version__ = '0.6.0'
class AioHttpTransportBase(
AsyncTransport,
HTTPTransport,
metaclass=abc.ABCMeta
):
def __init__(self, parsed_url=None, *, verify_ssl=True, resolve=True,
timeout=defaults.TIMEOUT,
keepalive=True, family=socket.AF_INET, client_session=None, loop=None):
self._resolve = resolve
self._keepalive = keepalive
self._family = family
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
if has_newstyle_transports:
if parsed_url is not None:
raise TypeError('Transport accepts no URLs for this version '
'of raven.')
super().__init__(timeout, verify_ssl)
else:
super().__init__(parsed_url, timeout, verify_ssl)
if client_session:
self._client_session = client_session
else:
self._client_session = self._client_session_factory()
self._closing = False
@property
def resolve(self):
return self._resolve
@property
def keepalive(self):
return self._keepalive
@property
def family(self):
return self._family
def _client_session_factory(self):
connector = aiohttp.TCPConnector(verify_ssl=self.verify_ssl,
resolve=self.resolve,
family=self.family,
loop=self._loop)
return aiohttp.ClientSession(connector=connector,
loop=self._loop)
@asyncio.coroutine
def _do_send(self, url, data, headers, success_cb, failure_cb):
resp = None
try:
resp = yield from self._client_session.post(
url,
data=data,
compress=False,
headers=headers,
timeout=self.timeout
)
code = resp.status
if code != 200:
msg = resp.headers.get('x-sentry-error')
if code == 429:
try:
retry_after = resp.headers.get('retry-after')
retry_after = int(retry_after)
except (ValueError, TypeError):
retry_after = 0
failure_cb(RateLimited(msg, retry_after))
else:
failure_cb(APIError(msg, code))
else:
success_cb()
except asyncio.CancelledError:
# do not mute asyncio.CancelledError
raise
except Exception as exc:
failure_cb(exc)
finally:
if resp is not None:
resp.release()
@abc.abstractmethod
def _async_send(self, url, data, headers, success_cb, failure_cb): # pragma: no cover
pass
@abc.abstractmethod
@asyncio.coroutine
def _close(self): # pragma: no cover
pass
def async_send(self, url, data, headers, success_cb, failure_cb):
if self._closing:
failure_cb(RuntimeError(
'{} is closed'.format(self.__class__.__name__)))
return
self._async_send(url, data, headers, success_cb, failure_cb)
@asyncio.coroutine
def _close_coro(self, *, timeout=None):
try:
yield from asyncio.wait_for(
self._close(), timeout=timeout, loop=self._loop)
except asyncio.TimeoutError:
pass
finally:
if self._client_session:
yield from self._client_session.close()
self._client_session = None
def close(self, *, timeout=None):
if self._closing:
@asyncio.coroutine
def dummy():
pass
return dummy()
self._closing = True
return self._close_coro(timeout=timeout)
if not has_newstyle_transports:
oldstyle_async_send = async_send
def async_send(self, *args, **kwargs):
return self.oldstyle_async_send(self._url, *args, **kwargs)
class AioHttpTransport(AioHttpTransportBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tasks = set()
def _async_send(self, url, data, headers, success_cb, failure_cb):
coro = self._do_send(url, data, headers, success_cb, failure_cb)
task = ensure_future(coro, loop=self._loop)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
@asyncio.coroutine
def _close(self):
yield from asyncio.gather(
*self._tasks,
return_exceptions=True,
loop=self._loop
)
assert len(self._tasks) == 0
class QueuedAioHttpTransport(AioHttpTransportBase):
def __init__(self, *args, workers=1, qsize=1000, **kwargs):
super().__init__(*args, **kwargs)
self._queue = asyncio.Queue(maxsize=qsize, loop=self._loop)
self._workers = set()
for _ in range(workers):
worker = ensure_future(self._worker(), loop=self._loop)
self._workers.add(worker)
worker.add_done_callback(self._workers.remove)
@asyncio.coroutine
def _worker(self):
while True:
data = yield from self._queue.get()
try:
if data is ...:
self._queue.put_nowait(...)
break
url, data, headers, success_cb, failure_cb = data
yield from self._do_send(url, data, headers, success_cb,
failure_cb)
finally:
self._queue.task_done()
def _async_send(self, url, data, headers, success_cb, failure_cb):
data = url, data, headers, success_cb, failure_cb
try:
self._queue.put_nowait(data)
except asyncio.QueueFull as exc:
skipped = self._queue.get_nowait()
self._queue.task_done()
*_, failure_cb = skipped
failure_cb(RuntimeError(
'QueuedAioHttpTransport internal queue is full'))
self._queue.put_nowait(data)
@asyncio.coroutine
def _close(self):
try:
self._queue.put_nowait(...)
except asyncio.QueueFull as exc:
skipped = self._queue.get_nowait()
self._queue.task_done()
*_, failure_cb = skipped
failure_cb(RuntimeError(
'QueuedAioHttpTransport internal queue was full'))
self._queue.put_nowait(...)
yield from asyncio.gather(
*self._workers,
return_exceptions=True,
loop=self._loop
)
assert len(self._workers) == 0
assert self._queue.qsize() == 1
try:
assert self._queue.get_nowait() is ...
finally:
self._queue.task_done()