diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index 049840c051a..6398e0a6cc2 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -39,7 +39,14 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool: - """Returns boolean indicating if pruning is due.""" + """Returns boolean indicating if pruning is due. + + Next pruning time is calculated as a delta from the last successful prune, or the + last successful indexing if pruning has never succeeded. + + TODO(rkuo): consider whether we should allow pruning to be immediately rescheduled + if pruning fails (which is what it does now). A backoff could be reasonable. + """ # skip pruning if no prune frequency is set # pruning can still be forced via the API which will run a pruning task directly @@ -225,6 +232,8 @@ def connector_pruning_generator_task( pruning_ctx_dict["request_id"] = self.request.id pruning_ctx.set(pruning_ctx_dict) + task_logger.info(f"Pruning generator starting: cc_pair={cc_pair_id}") + redis_connector = RedisConnector(tenant_id, cc_pair_id) r = get_redis_client(tenant_id=tenant_id) @@ -255,6 +264,11 @@ def connector_pruning_generator_task( ) return + task_logger.info( + f"Pruning generator running connector: " + f"cc_pair={cc_pair_id} " + f"connector_source={cc_pair.connector.source}" + ) runnable_connector = instantiate_connector( db_session, cc_pair.connector.source, @@ -269,6 +283,7 @@ def connector_pruning_generator_task( lock, r, ) + # a list of docs in the source all_connector_doc_ids: set[str] = extract_ids_from_runnable_connector( runnable_connector, callback @@ -290,8 +305,8 @@ def connector_pruning_generator_task( task_logger.info( f"Pruning set collected: " f"cc_pair={cc_pair_id} " - f"docs_to_remove={len(doc_ids_to_remove)} " - f"doc_source={cc_pair.connector.source}" + f"connector_source={cc_pair.connector.source} " + f"docs_to_remove={len(doc_ids_to_remove)}" ) task_logger.info( @@ -314,10 +329,10 @@ def connector_pruning_generator_task( f"Failed to run pruning: cc_pair={cc_pair_id} connector={connector_id}" ) - redis_connector.prune.generator_clear() - redis_connector.prune.taskset_clear() - redis_connector.prune.set_fence(False) + redis_connector.prune.reset() raise e finally: if lock.owned(): lock.release() + + task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}") diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 9f1fe2e05f5..b702a957416 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -162,7 +162,7 @@ def _convert_object_to_document( self.confluence_client, confluence_object ) - if object_text is None: + if not object_text: return None # Get space name diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index c01f45dea6a..89c50f28181 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -153,9 +153,10 @@ def _paginate_url( while url_suffix: try: + logger.info(f"Making confluence call to {url_suffix}") next_response = self.get(url_suffix) except Exception as e: - logger.exception("Error in danswer_cql: \n") + logger.exception(f"Error in confluence call to {url_suffix}") raise e yield next_response.get("results", []) url_suffix = next_response.get("_links", {}).get("next") diff --git a/backend/danswer/redis/redis_connector_prune.py b/backend/danswer/redis/redis_connector_prune.py index 25e0a6314de..f8e6f372619 100644 --- a/backend/danswer/redis/redis_connector_prune.py +++ b/backend/danswer/redis/redis_connector_prune.py @@ -150,6 +150,12 @@ def generate_tasks( return len(async_results) + def reset(self) -> None: + self.redis.delete(self.generator_progress_key) + self.redis.delete(self.generator_complete_key) + self.redis.delete(self.taskset_key) + self.redis.delete(self.fence_key) + @staticmethod def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None: taskset_key = f"{RedisConnectorPrune.TASKSET_PREFIX}_{id}"