Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge hotfix/v0.10-worker-process-init into release/v0.10 #2989

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
@beat_init.connect
def on_beat_init(sender: Any, **kwargs: Any) -> None:
logger.info("beat_init signal received.")

# celery beat shouldn't touch the db at all. But just setting a low minimum here.
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
SqlEngine.init_engine(pool_size=2, max_overflow=0)
app_base.wait_for_redis(sender, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)
SqlEngine.init_engine(pool_size=4, max_overflow=12)

app_base.wait_for_redis(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)
Expand Down
13 changes: 0 additions & 13 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
r.delete(key)


# @worker_process_init.connect
# def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
# """This only runs inside child processes when the worker is in pool=prefork mode.
# This may be technically unnecessary since we're finding prefork pools to be
# unstable and currently aren't planning on using them."""
# logger.info("worker_process_init signal received.")
# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# SqlEngine.init_engine(pool_size=5, max_overflow=0)

# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
# SqlEngine.get_engine().dispose(close=False)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_ready(sender, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions backend/danswer/background/indexing/job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from typing import Literal
from typing import Optional

from danswer.db.engine import get_sqlalchemy_engine
from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
from danswer.db.engine import SqlEngine
from danswer.utils.logger import setup_logger

logger = setup_logger()
Expand All @@ -37,7 +38,9 @@ def _initializer(
if kwargs is None:
kwargs = {}

get_sqlalchemy_engine().dispose(close=False)
logger.info("Initializing spawned worker child process.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
return func(*args, **kwargs)


Expand Down
Loading