Skip to content

Commit

Permalink
Merge pull request #3165 from danswer-ai/bugfix/pruning_logs
Browse files Browse the repository at this point in the history
improve logging around pruning
  • Loading branch information
rkuo-danswer authored and hagen-danswer committed Nov 20, 2024
2 parents eb81258 + e6df32d commit 00f8ba1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
27 changes: 21 additions & 6 deletions backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@


def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
"""Returns boolean indicating if pruning is due."""
"""Returns boolean indicating if pruning is due.
Next pruning time is calculated as a delta from the last successful prune, or the
last successful indexing if pruning has never succeeded.
TODO(rkuo): consider whether we should allow pruning to be immediately rescheduled
if pruning fails (which is what it does now). A backoff could be reasonable.
"""

# skip pruning if no prune frequency is set
# pruning can still be forced via the API which will run a pruning task directly
Expand Down Expand Up @@ -225,6 +232,8 @@ def connector_pruning_generator_task(
pruning_ctx_dict["request_id"] = self.request.id
pruning_ctx.set(pruning_ctx_dict)

task_logger.info(f"Pruning generator starting: cc_pair={cc_pair_id}")

redis_connector = RedisConnector(tenant_id, cc_pair_id)

r = get_redis_client(tenant_id=tenant_id)
Expand Down Expand Up @@ -255,6 +264,11 @@ def connector_pruning_generator_task(
)
return

task_logger.info(
f"Pruning generator running connector: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source}"
)
runnable_connector = instantiate_connector(
db_session,
cc_pair.connector.source,
Expand All @@ -269,6 +283,7 @@ def connector_pruning_generator_task(
lock,
r,
)

# a list of docs in the source
all_connector_doc_ids: set[str] = extract_ids_from_runnable_connector(
runnable_connector, callback
Expand All @@ -290,8 +305,8 @@ def connector_pruning_generator_task(
task_logger.info(
f"Pruning set collected: "
f"cc_pair={cc_pair_id} "
f"docs_to_remove={len(doc_ids_to_remove)} "
f"doc_source={cc_pair.connector.source}"
f"connector_source={cc_pair.connector.source} "
f"docs_to_remove={len(doc_ids_to_remove)}"
)

task_logger.info(
Expand All @@ -314,10 +329,10 @@ def connector_pruning_generator_task(
f"Failed to run pruning: cc_pair={cc_pair_id} connector={connector_id}"
)

redis_connector.prune.generator_clear()
redis_connector.prune.taskset_clear()
redis_connector.prune.set_fence(False)
redis_connector.prune.reset()
raise e
finally:
if lock.owned():
lock.release()

task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}")
2 changes: 1 addition & 1 deletion backend/danswer/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _convert_object_to_document(
self.confluence_client, confluence_object
)

if object_text is None:
if not object_text:
return None

# Get space name
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/connectors/confluence/onyx_confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ def _paginate_url(

while url_suffix:
try:
logger.info(f"Making confluence call to {url_suffix}")
next_response = self.get(url_suffix)
except Exception as e:
logger.exception("Error in danswer_cql: \n")
logger.exception(f"Error in confluence call to {url_suffix}")
raise e
yield next_response.get("results", [])
url_suffix = next_response.get("_links", {}).get("next")
Expand Down
6 changes: 6 additions & 0 deletions backend/danswer/redis/redis_connector_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ def generate_tasks(

return len(async_results)

def reset(self) -> None:
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)

@staticmethod
def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None:
taskset_key = f"{RedisConnectorPrune.TASKSET_PREFIX}_{id}"
Expand Down

0 comments on commit 00f8ba1

Please sign in to comment.