Skip to content

Commit

Permalink
Merge pull request #2989 from danswer-ai/hotfix/v0.10-worker-process-…
Browse files Browse the repository at this point in the history
…init

Merge hotfix/v0.10-worker-process-init into release/v0.10
  • Loading branch information
rkuo-danswer authored Oct 29, 2024
2 parents ee5e085 + d0e018a commit 4f59894
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 16 deletions.
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
@beat_init.connect
def on_beat_init(sender: Any, **kwargs: Any) -> None:
logger.info("beat_init signal received.")

# celery beat shouldn't touch the db at all. But just setting a low minimum here.
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
SqlEngine.init_engine(pool_size=2, max_overflow=0)
app_base.wait_for_redis(sender, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)
SqlEngine.init_engine(pool_size=4, max_overflow=12)

app_base.wait_for_redis(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)
Expand Down
13 changes: 0 additions & 13 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
r.delete(key)


# @worker_process_init.connect
# def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
# """This only runs inside child processes when the worker is in pool=prefork mode.
# This may be technically unnecessary since we're finding prefork pools to be
# unstable and currently aren't planning on using them."""
# logger.info("worker_process_init signal received.")
# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# SqlEngine.init_engine(pool_size=5, max_overflow=0)

# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
# SqlEngine.get_engine().dispose(close=False)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_ready(sender, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions backend/danswer/background/indexing/job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from typing import Literal
from typing import Optional

from danswer.db.engine import get_sqlalchemy_engine
from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
from danswer.db.engine import SqlEngine
from danswer.utils.logger import setup_logger

logger = setup_logger()
Expand All @@ -37,7 +38,9 @@ def _initializer(
if kwargs is None:
kwargs = {}

get_sqlalchemy_engine().dispose(close=False)
logger.info("Initializing spawned worker child process.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
return func(*args, **kwargs)


Expand Down

0 comments on commit 4f59894

Please sign in to comment.