Skip to content

Commit

Permalink
Merge pull request #44 from tutorcruncher/logging-changes
Browse files Browse the repository at this point in the history
Adds qlength to logs and enforces 5s timeout on requests
  • Loading branch information
HenryTraill authored Oct 28, 2024
2 parents 2d56866 + 292e642 commit 2d1b622
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
21 changes: 20 additions & 1 deletion chronos/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def webhook_request(url: str, *, method: str = 'POST', webhook_sig: str, data: d
with logfire.span('{method=} {url!r}', url=url, method=method):
r = None
try:
r = session.request(method=method, url=url, json=data, headers=headers)
r = session.request(method=method, url=url, json=data, headers=headers, timeout=5)
except requests.exceptions.HTTPError as httperr:
app_logger.info('HTTP error sending webhook to %s: %s', url, httperr)
except requests.exceptions.ConnectionError as conerr:
Expand All @@ -69,6 +69,20 @@ def webhook_request(url: str, *, method: str = 'POST', webhook_sig: str, data: d
acceptable_url_schemes = ('http', 'https', 'ftp', 'ftps')


def get_qlength():
"""
Get the length of the queue from celery. Celery returns a dictionary like so: {'queue_name': [task1, task2, ...]}
so to get qlength we simply aggregate the length of all task lists
"""
qlength = 0
celery_inspector = celery_app.control.inspect()
dict_of_queues = celery_inspector.reserved()
if dict_of_queues and isinstance(dict_of_queues, dict):
for k, v in dict_of_queues.items():
qlength += len(v)
return qlength


@celery_app.task
def task_send_webhooks(
payload: str,
Expand All @@ -81,6 +95,11 @@ def task_send_webhooks(
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
branch_id = loaded_payload['events'][0]['branch']

qlength = get_qlength()
app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
if qlength > 100:
app_logger.error('Queue is too long. Check workers and speeds.')

total_success, total_failed = 0, 0
with Session(engine) as db:
# Get all the endpoints for the branch
Expand Down
16 changes: 8 additions & 8 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ def test_webhook_not_send_errors(
mock_response.side_effect = requests.exceptions.RequestException()
task_send_webhooks(json.dumps(payload))
webhooks = db.exec(select(WebhookLog)).all()
assert mock_logger.info.call_count == 2
assert 'Request error sending webhook to' in mock_logger.info.call_args_list[0][0][0]
assert mock_logger.info.call_count == 3
assert 'Request error sending webhook to' in mock_logger.info.call_args_list[1][0][0]
assert mock_response.call_count == 1
assert len(webhooks) == 1

Expand All @@ -280,24 +280,24 @@ def test_webhook_not_send_errors(
mock_response.side_effect = requests.exceptions.HTTPError()
task_send_webhooks(json.dumps(payload))
webhooks = db.exec(select(WebhookLog)).all()
assert mock_logger.info.call_count == 4
assert 'HTTP error sending webhook to' in mock_logger.info.call_args_list[2][0][0]
assert mock_logger.info.call_count == 6
assert 'HTTP error sending webhook to' in mock_logger.info.call_args_list[4][0][0]
assert mock_response.call_count == 2
assert len(webhooks) == 2

mock_response.side_effect = requests.exceptions.ConnectionError()
task_send_webhooks(json.dumps(payload))
webhooks = db.exec(select(WebhookLog)).all()
assert mock_logger.info.call_count == 6
assert 'Connection error sending webhook to' in mock_logger.info.call_args_list[4][0][0]
assert mock_logger.info.call_count == 9
assert 'Connection error sending webhook to' in mock_logger.info.call_args_list[7][0][0]
assert mock_response.call_count == 3
assert len(webhooks) == 3

mock_response.side_effect = requests.exceptions.Timeout()
task_send_webhooks(json.dumps(payload))
webhooks = db.exec(select(WebhookLog)).all()
assert mock_logger.info.call_count == 8
assert 'Timeout error sending webhook to' in mock_logger.info.call_args_list[6][0][0]
assert mock_logger.info.call_count == 12
assert 'Timeout error sending webhook to' in mock_logger.info.call_args_list[10][0][0]
assert mock_response.call_count == 4
assert len(webhooks) == 4

Expand Down

0 comments on commit 2d1b622

Please sign in to comment.