From 6ffd60c909d0216038dfc88896bed97dfa93ab02 Mon Sep 17 00:00:00 2001 From: Richard Kuo Date: Tue, 29 Oct 2024 22:42:08 -0700 Subject: [PATCH] add extra tags to pruning logs --- .../background/celery/tasks/pruning/tasks.py | 8 ++++++- backend/danswer/utils/logger.py | 21 +++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index d9579ccf93c..d20430df1f8 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -34,6 +34,7 @@ from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.models import ConnectorCredentialPair from danswer.redis.redis_pool import get_redis_client +from danswer.utils.logger import pruning_ctx from danswer.utils.logger import setup_logger logger = setup_logger() @@ -229,10 +230,15 @@ def connector_pruning_generator_task( and compares those IDs to locally stored documents and deletes all locally stored IDs missing from the most recently pulled document ID list""" - r = get_redis_client(tenant_id=tenant_id) + pruning_ctx_dict = pruning_ctx.get() + pruning_ctx_dict["cc_pair_id"] = cc_pair_id + pruning_ctx_dict["request_id"] = self.request.id + pruning_ctx.set(pruning_ctx_dict) rcp = RedisConnectorPruning(cc_pair_id) + r = get_redis_client(tenant_id=tenant_id) + lock = r.lock( DanswerRedisLocks.PRUNING_LOCK_PREFIX + f"_{rcp._id}", timeout=CELERY_PRUNING_LOCK_TIMEOUT, diff --git a/backend/danswer/utils/logger.py b/backend/danswer/utils/logger.py index 98d86f799a3..bc872b0c423 100644 --- a/backend/danswer/utils/logger.py +++ b/backend/danswer/utils/logger.py @@ -1,3 +1,4 @@ +import contextvars import logging import os from collections.abc import MutableMapping @@ -16,6 +17,10 @@ logging.addLevelName(logging.INFO + 5, "NOTICE") +pruning_ctx: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar( + "pruning_ctx", default=dict() +) + class IndexAttemptSingleton: """Used to tell if this process is an indexing job, and if so what is the @@ -64,11 +69,19 @@ def process( index_attempt_id = IndexAttemptSingleton.get_index_attempt_id() cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id() - if index_attempt_id is not None: - msg = f"[Index Attempt: {index_attempt_id}] {msg}" + pruning_ctx_dict = pruning_ctx.get() + if len(pruning_ctx_dict) > 0: + if "request_id" in pruning_ctx_dict: + msg = f"[Prune: {pruning_ctx_dict['request_id']}] {msg}" + + if "cc_pair_id" in pruning_ctx_dict: + msg = f"[CC Pair: {pruning_ctx_dict['cc_pair_id']}] {msg}" + else: + if index_attempt_id is not None: + msg = f"[Index Attempt: {index_attempt_id}] {msg}" - if cc_pair_id is not None: - msg = f"[CC Pair: {cc_pair_id}] {msg}" + if cc_pair_id is not None: + msg = f"[CC Pair: {cc_pair_id}] {msg}" # Add tenant information if it differs from default # This will always be the case for authenticated API requests