Skip to content

Commit

Permalink
Use gevent for celery worker, update timeout to GeventTimeout (#1345)
Browse files Browse the repository at this point in the history
* hotfix for worker starting up

* Use gevent for celery worker, update timeout to GeventTimeout

* Use gevent for celery worker, update timeout to GeventTimeout

* Default back to multiprocess worker, but with Timeout dynamic for gevent and non-gevent cases
  • Loading branch information
czgu authored Oct 12, 2023
1 parent b74419f commit 7480a64
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.28.2",
"version": "3.28.3",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion querybook/scripts/run_test
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ usage() {

run_python_unit_test() {
echo 'Start running python unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>'
py.test querybook/tests || exit 1
python -m gevent.monkey --module pytest querybook/tests || exit 1
}

run_webpack_test() {
Expand Down
1 change: 0 additions & 1 deletion querybook/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def cancel_query_and_notify():
"RECEIVED", # Rare case where task is received but not yet start
"RETRY", # Very unlikely case, because query normally do not retry
):

task.revoke(terminate=True) # last attempt to cancel it
cancel_query_and_notify()
elif task.state == "ABORTED":
Expand Down
53 changes: 49 additions & 4 deletions querybook/server/lib/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from contextlib import contextmanager
import inspect
import signal
import subprocess
from datetime import datetime, date
from functools import wraps
from typing import Optional, Union

import gevent

from lib.logger import get_logger

Expand Down Expand Up @@ -91,17 +95,58 @@ class TimeoutError(Exception):
pass


class Timeout:
def __init__(self, sec, custom_error_message=None):
def is_gevent_monkey_patched():
try:
from gevent import monkey
except ImportError:
return False
else:
# Choose a random library to test if it's patched
return monkey.is_module_patched("socket")


@contextmanager
def Timeout(sec: Union[int, float] = 1, custom_error_message: Optional[str] = None):
if is_gevent_monkey_patched():
with GeventTimeout(sec, custom_error_message=custom_error_message):
yield
else:
with SignalTimeout(sec, custom_error_message=custom_error_message):
yield


@contextmanager
def GeventTimeout(
sec: Union[int, float] = 1, custom_error_message: Optional[str] = None
):
"""
This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)
"""

error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
timeout = gevent.Timeout(sec, TimeoutError(error_message))
timeout.start()

try:
yield
finally:
timeout.close()


# Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker
class SignalTimeout:
def __init__(
self, sec: Union[int, float], custom_error_message: Optional[str] = None
):
self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
self.sec = sec

def __enter__(self):
signal.signal(signal.SIGALRM, self.raise_timeout)
signal.alarm(self.sec)
signal.setitimer(signal.ITIMER_REAL, self.sec)

def __exit__(self, *args):
signal.alarm(0) # disable alarm
signal.setitimer(signal.ITIMER_REAL, 0) # disable alarm

def raise_timeout(self, *args):
raise TimeoutError(self.error_message)
Expand Down
4 changes: 0 additions & 4 deletions querybook/server/runweb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""This file is for dev server only.
DO NOT USE FOR PROD
"""

from gevent import monkey

monkey.patch_all()
Expand Down
1 change: 0 additions & 1 deletion querybook/server/tasks/run_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from logic.elasticsearch import update_query_execution_by_id
from tasks.log_query_per_table import log_query_per_table_task


LOG = get_task_logger(__name__)


Expand Down
33 changes: 33 additions & 0 deletions querybook/tests/test_lib/test_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time

import pytest

from lib.utils.utils import GeventTimeout, SignalTimeout, TimeoutError


def test_gevent_timeout():
with pytest.raises(TimeoutError):
with GeventTimeout(0.1):
time.sleep(0.2)


def test_gevent_no_timeout():
try:
with GeventTimeout(0.2):
time.sleep(0.1)
except TimeoutError:
pytest.fail("TimeoutError raised when it shouldn't")


def test_signal_timeout():
with pytest.raises(TimeoutError):
with SignalTimeout(0.1):
time.sleep(0.2)


def test_signal_no_timeout():
try:
with SignalTimeout(0.2):
time.sleep(0.1)
except TimeoutError:
pytest.fail("TimeoutError raised when it shouldn't")
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Jinja2==3.1.2 # From Flask

# Celery
celery==5.2.7
kombu==5.3.1 # not a direct dependency (from celery), pinned by due to bug: https://github.com/celery/kombu/issues/1785


# Ops
pyyaml==5.4.1
Expand Down

0 comments on commit 7480a64

Please sign in to comment.