Skip to content

Commit

Permalink
Improve indexing status
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves committed Aug 12, 2023
1 parent 54ee323 commit 4a2b595
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Add docs_indexed_column + time_started to index_attempt table
Revision ID: 5e84129c8be3
Revises: e6a4bbc13fe4
Create Date: 2023-08-10 21:43:09.069523
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "5e84129c8be3"
down_revision = "e6a4bbc13fe4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"index_attempt",
sa.Column("num_docs_indexed", sa.Integer()),
)
op.add_column(
"index_attempt",
sa.Column(
"time_started",
sa.DateTime(timezone=True),
nullable=True,
),
)


def downgrade() -> None:
op.drop_column("index_attempt", "time_started")
op.drop_column("index_attempt", "num_docs_indexed")
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add index for retrieving latest index_attempt
Revision ID: e6a4bbc13fe4
Revises: b082fec533f0
Create Date: 2023-08-10 12:37:23.335471
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e6a4bbc13fe4"
down_revision = "b082fec533f0"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
"index_attempt",
["connector_id", "credential_id", "time_created"],
unique=False,
)


def downgrade() -> None:
op.drop_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
table_name="index_attempt",
)
5 changes: 3 additions & 2 deletions backend/danswer/background/connector_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _delete_singly_indexed_docs() -> int:
return num_docs_deleted

num_docs_deleted = _delete_singly_indexed_docs()
logger.info(f"Deleted {num_docs_deleted} documents from document stores")

def _update_multi_indexed_docs() -> None:
# if a document is indexed by multiple connector_credential_pairs, we should
Expand Down Expand Up @@ -132,8 +133,8 @@ def _get_user(
# categorize into groups of updates to try and batch them more efficiently
update_groups: dict[tuple[str, ...], list[str]] = {}
for document_id, allowed_users_lst in document_id_to_allowed_users.items():
# if document_id in document_ids_not_needing_update:
# continue
if document_id in document_ids_not_needing_update:
continue

allowed_users_lst.remove(to_be_deleted_user)
allowed_users = tuple(sorted(set(allowed_users_lst)))
Expand Down
37 changes: 28 additions & 9 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_inprogress_index_attempts
from danswer.db.index_attempt import get_last_successful_attempt
from danswer.db.index_attempt import get_last_attempt
from danswer.db.index_attempt import get_not_started_index_attempts
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.index_attempt import mark_attempt_in_progress
from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import Connector
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
Expand All @@ -45,7 +46,9 @@ def should_create_new_indexing(


def create_indexing_jobs(db_session: Session) -> None:
connectors = fetch_connectors(db_session, disabled_status=False)
connectors = fetch_connectors(db_session)

# clean up in-progress jobs that were never completed
for connector in connectors:
in_progress_indexing_attempts = get_inprogress_index_attempts(
connector.id, db_session
Expand All @@ -59,7 +62,11 @@ def create_indexing_jobs(db_session: Session) -> None:
f"Marking in-progress attempt 'connector: {attempt.connector_id}, "
f"credential: {attempt.credential_id}' as failed"
)
mark_attempt_failed(attempt, db_session)
mark_attempt_failed(
attempt,
db_session,
failure_reason="Stopped mid run, likely due to the background process being killed",
)
if attempt.connector_id and attempt.credential_id:
update_connector_credential_pair(
connector_id=attempt.connector_id,
Expand All @@ -70,15 +77,16 @@ def create_indexing_jobs(db_session: Session) -> None:
db_session=db_session,
)

# potentially kick off new runs
enabled_connectors = [
connector for connector in connectors if not connector.disabled
]
for connector in enabled_connectors:
for association in connector.credentials:
credential = association.credential

last_successful_attempt = get_last_successful_attempt(
connector.id, credential.id, db_session
)
if not should_create_new_indexing(
connector, last_successful_attempt, db_session
):
last_attempt = get_last_attempt(connector.id, credential.id, db_session)
if not should_create_new_indexing(connector, last_attempt, db_session):
continue
create_index_attempt(connector.id, credential.id, db_session)

Expand Down Expand Up @@ -199,6 +207,17 @@ def run_indexing_jobs(db_session: Session) -> None:
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
update_docs_indexed(
db_session=db_session,
index_attempt=attempt,
num_docs_indexed=document_count,
)

# check if connector is disabled mid run and stop if so
db_session.refresh(db_connector)
if db_connector.disabled:
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")

mark_attempt_succeeded(attempt, db_session)
update_connector_credential_pair(
Expand Down
5 changes: 5 additions & 0 deletions backend/danswer/connectors/slack/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def poll_source(
raise ConnectorMissingCredentialError("Slack")

documents: list[Document] = []
seen_doc_ids = set()
for document in get_all_docs(
client=self.client,
workspace=self.workspace,
Expand All @@ -338,6 +339,10 @@ def poll_source(
oldest=str(start) if start else None,
latest=str(end),
):
if document.id in seen_doc_ids:
logger.info("DUPLICATE FOUND")
logger.info(document)
seen_doc_ids.add(document.id)
documents.append(document)
if len(documents) >= self.batch_size:
yield documents
Expand Down
1 change: 1 addition & 0 deletions backend/danswer/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from danswer.db.connector import fetch_connector_by_id
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.server.models import StatusResponse
Expand Down
46 changes: 44 additions & 2 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from collections.abc import Sequence

from sqlalchemy import and_
from sqlalchemy import ColumnElement
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy.orm import Session

from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.server.models import ConnectorCredentialPairIdentifier
from danswer.utils.logger import setup_logger


Expand Down Expand Up @@ -52,6 +59,7 @@ def mark_attempt_in_progress(
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.IN_PROGRESS
index_attempt.time_started = index_attempt.time_started or func.now()
db_session.add(index_attempt)
db_session.commit()

Expand All @@ -74,21 +82,55 @@ def mark_attempt_failed(
db_session.commit()


def get_last_successful_attempt(
def update_docs_indexed(
db_session: Session, index_attempt: IndexAttempt, num_docs_indexed: int
) -> None:
index_attempt.num_docs_indexed = num_docs_indexed
db_session.add(index_attempt)
db_session.commit()


def get_last_attempt(
connector_id: int,
credential_id: int,
db_session: Session,
) -> IndexAttempt | None:
stmt = select(IndexAttempt)
stmt = stmt.where(IndexAttempt.connector_id == connector_id)
stmt = stmt.where(IndexAttempt.credential_id == credential_id)
stmt = stmt.where(IndexAttempt.status == IndexingStatus.SUCCESS)
# Note, the below is using time_created instead of time_updated
stmt = stmt.order_by(desc(IndexAttempt.time_created))

return db_session.execute(stmt).scalars().first()


def get_latest_index_attempts(
connector_credential_pair_identifiers: list[ConnectorCredentialPairIdentifier],
db_session: Session,
) -> Sequence[IndexAttempt]:
ids_stmt = select(
IndexAttempt.id, func.max(IndexAttempt.time_created).label("max_updated_at")
)

where_stmts: list[ColumnElement] = []
for connector_credential_pair_identifier in connector_credential_pair_identifiers:
where_stmts.append(
and_(
IndexAttempt.connector_id
== connector_credential_pair_identifier.connector_id,
IndexAttempt.credential_id
== connector_credential_pair_identifier.credential_id,
)
)
if where_stmts:
ids_stmt = ids_stmt.where(or_(*where_stmts))
ids_stmt = ids_stmt.group_by(IndexAttempt.id)
ids_subqery = ids_stmt.subquery()

stmt = select(IndexAttempt).join(ids_subqery, ids_subqery.c.id == IndexAttempt.id)
return db_session.execute(stmt).scalars().all()


def delete_index_attempts(
connector_id: int,
credential_id: int,
Expand Down
6 changes: 6 additions & 0 deletions backend/danswer/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,19 @@ class IndexAttempt(Base):
nullable=True,
)
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
num_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
error_msg: Mapped[str | None] = mapped_column(
String(), default=None
) # only filled if status = "failed"
time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
# when the actual indexing run began
# NOTE: will use the api_server clock rather than DB server clock
time_started: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), default=None
)
time_updated: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
Expand Down
28 changes: 28 additions & 0 deletions backend/danswer/server/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
from danswer.db.engine import get_session
from danswer.db.engine import get_sqlalchemy_async_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.direct_qa import check_model_api_key_is_valid
Expand All @@ -74,6 +76,7 @@
from danswer.server.models import FileUploadResponse
from danswer.server.models import GDriveCallback
from danswer.server.models import GoogleAppCredentials
from danswer.server.models import IndexAttemptSnapshot
from danswer.server.models import ObjectCreationIdResponse
from danswer.server.models import RunConnectorRequest
from danswer.server.models import StatusResponse
Expand Down Expand Up @@ -190,6 +193,20 @@ def get_connector_indexing_status(

# TODO: make this one query
cc_pairs = get_connector_credential_pairs(db_session)
latest_index_attempts = get_latest_index_attempts(
db_session=db_session,
connector_credential_pair_identifiers=[
ConnectorCredentialPairIdentifier(
connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id
)
for cc_pair in cc_pairs
],
)
cc_pair_to_latest_index_attempt = {
(index_attempt.connector_id, index_attempt.credential_id): index_attempt
for index_attempt in latest_index_attempts
}

deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = {
cc_pair.connector.id: [] for cc_pair in cc_pairs
}
Expand All @@ -205,6 +222,9 @@ def get_connector_indexing_status(
for cc_pair in cc_pairs:
connector = cc_pair.connector
credential = cc_pair.credential
latest_index_attempt = cc_pair_to_latest_index_attempt.get(
(connector.id, credential.id)
)
deletion_attemts = deletion_attempts_by_connector.get(connector.id, [])
indexing_statuses.append(
ConnectorIndexingStatus(
Expand All @@ -215,6 +235,14 @@ def get_connector_indexing_status(
last_status=cc_pair.last_attempt_status,
last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair.total_docs_indexed,
error_msg=latest_index_attempt.error_msg
if latest_index_attempt
else None,
latest_index_attempt=IndexAttemptSnapshot.from_index_attempt_db_model(
latest_index_attempt
)
if latest_index_attempt
else None,
deletion_attempts=[
DeletionAttemptSnapshot.from_deletion_attempt_db_model(
deletion_attempt
Expand Down
Loading

0 comments on commit 4a2b595

Please sign in to comment.