Skip to content

Commit

Permalink
Indexing settings and logging improve (#821)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Aliaksandr Chernak <aliaksandr_chernak@epam.com>
Co-authored-by: Yuhong Sun <yuhongsun96@gmail.com>
  • Loading branch information
3 people authored Dec 22, 2023
1 parent 6650f01 commit 0318507
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
8 changes: 5 additions & 3 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from danswer.background.indexing.job_client import SimpleJob
from danswer.background.indexing.job_client import SimpleJobClient
from danswer.background.indexing.run_indexing import run_indexing_entrypoint
from danswer.configs.app_configs import CLEANUP_INDEXING_JOBS_TIMEOUT
from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import LOG_LEVEL
from danswer.configs.app_configs import MODEL_SERVER_HOST
Expand Down Expand Up @@ -155,7 +156,8 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None:


def cleanup_indexing_jobs(
existing_jobs: dict[int, Future | SimpleJob]
existing_jobs: dict[int, Future | SimpleJob],
timeout_hours: int = CLEANUP_INDEXING_JOBS_TIMEOUT,
) -> dict[int, Future | SimpleJob]:
existing_jobs_copy = existing_jobs.copy()

Expand Down Expand Up @@ -203,13 +205,13 @@ def cleanup_indexing_jobs(
)
for index_attempt in in_progress_indexing_attempts:
if index_attempt.id in existing_jobs:
# check to see if the job has been updated in last hour, if not
# check to see if the job has been updated in last n hours, if not
# assume it to frozen in some bad state and just mark it as failed. Note: this relies
# on the fact that the `time_updated` field is constantly updated every
# batch of documents indexed
current_db_time = get_db_current_time(db_session=db_session)
time_since_update = current_db_time - index_attempt.time_updated
if time_since_update.total_seconds() > 60 * 60:
if time_since_update.total_seconds() > 60 * 60 * timeout_hours:
existing_jobs[index_attempt.id].cancel()
_mark_run_failed(
db_session=db_session,
Expand Down
4 changes: 3 additions & 1 deletion backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
)
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
INDEX_BATCH_SIZE = 16
INDEX_BATCH_SIZE = os.environ.get("INDEX_BATCH_SIZE", 16)

# Below are intended to match the env variables names used by the official postgres docker image
# https://hub.docker.com/_/postgres
Expand Down Expand Up @@ -173,6 +173,8 @@
# Slightly larger since the sentence aware split is a max cutoff so most minichunks will be under MINI_CHUNK_SIZE
# tokens. But we need it to be at least as big as 1/4th chunk size to avoid having a tiny mini-chunk at the end
MINI_CHUNK_SIZE = 150
# Timeout to wait for job's last update before killing it, in hours
CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 1))


#####
Expand Down
7 changes: 6 additions & 1 deletion backend/danswer/indexing/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from danswer.indexing.models import IndexChunk
from danswer.search.models import Embedder
from danswer.search.search_nlp_models import EmbeddingModel
from danswer.utils.logger import setup_logger
from danswer.utils.timing import log_function_time

logger = setup_logger()


@log_function_time()
def embed_chunks(
Expand Down Expand Up @@ -42,7 +45,9 @@ def embed_chunks(
]

embeddings: list[list[float]] = []
for text_batch in text_batches:
len_text_batches = len(text_batches)
for idx, text_batch in enumerate(text_batches, start=1):
logger.debug(f"Embedding text batch {idx} of {len_text_batches}")
# Normalize embeddings is only configured via model_configs.py, be sure to use right value for the set loss
embeddings.extend(embedding_model.encode(text_batch))

Expand Down

1 comment on commit 0318507

@vercel
Copy link

@vercel vercel bot commented on 0318507 Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.