Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve indexing status display #278

Merged
merged 3 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Add docs_indexed_column + time_started to index_attempt table

Revision ID: 5e84129c8be3
Revises: e6a4bbc13fe4
Create Date: 2023-08-10 21:43:09.069523

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "5e84129c8be3"
down_revision = "e6a4bbc13fe4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"index_attempt",
sa.Column("num_docs_indexed", sa.Integer()),
)
op.add_column(
"index_attempt",
sa.Column(
"time_started",
sa.DateTime(timezone=True),
nullable=True,
),
)


def downgrade() -> None:
op.drop_column("index_attempt", "time_started")
op.drop_column("index_attempt", "num_docs_indexed")
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add index for retrieving latest index_attempt

Revision ID: e6a4bbc13fe4
Revises: b082fec533f0
Create Date: 2023-08-10 12:37:23.335471

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e6a4bbc13fe4"
down_revision = "b082fec533f0"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is ix?

"index_attempt",
["connector_id", "credential_id", "time_created"],
unique=False,
)


def downgrade() -> None:
op.drop_index(
op.f("ix_index_attempt_latest_for_connector_credential_pair"),
table_name="index_attempt",
)
5 changes: 3 additions & 2 deletions backend/danswer/background/connector_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _delete_singly_indexed_docs() -> int:
return num_docs_deleted

num_docs_deleted = _delete_singly_indexed_docs()
logger.info(f"Deleted {num_docs_deleted} documents from document stores")

def _update_multi_indexed_docs() -> None:
# if a document is indexed by multiple connector_credential_pairs, we should
Expand Down Expand Up @@ -132,8 +133,8 @@ def _get_user(
# categorize into groups of updates to try and batch them more efficiently
update_groups: dict[tuple[str, ...], list[str]] = {}
for document_id, allowed_users_lst in document_id_to_allowed_users.items():
# if document_id in document_ids_not_needing_update:
# continue
if document_id in document_ids_not_needing_update:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not have been commented out previously 😓

continue

allowed_users_lst.remove(to_be_deleted_user)
allowed_users = tuple(sorted(set(allowed_users_lst)))
Expand Down
37 changes: 28 additions & 9 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_inprogress_index_attempts
from danswer.db.index_attempt import get_last_successful_attempt
from danswer.db.index_attempt import get_last_attempt
from danswer.db.index_attempt import get_not_started_index_attempts
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.index_attempt import mark_attempt_in_progress
from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import Connector
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
Expand All @@ -45,7 +46,9 @@ def should_create_new_indexing(


def create_indexing_jobs(db_session: Session) -> None:
connectors = fetch_connectors(db_session, disabled_status=False)
connectors = fetch_connectors(db_session)
yuhongsun96 marked this conversation as resolved.
Show resolved Hide resolved

# clean up in-progress jobs that were never completed
for connector in connectors:
in_progress_indexing_attempts = get_inprogress_index_attempts(
connector.id, db_session
Expand All @@ -59,7 +62,11 @@ def create_indexing_jobs(db_session: Session) -> None:
f"Marking in-progress attempt 'connector: {attempt.connector_id}, "
f"credential: {attempt.credential_id}' as failed"
)
mark_attempt_failed(attempt, db_session)
mark_attempt_failed(
attempt,
db_session,
failure_reason="Stopped mid run, likely due to the background process being killed",
)
if attempt.connector_id and attempt.credential_id:
update_connector_credential_pair(
connector_id=attempt.connector_id,
Expand All @@ -70,15 +77,16 @@ def create_indexing_jobs(db_session: Session) -> None:
db_session=db_session,
)

# potentially kick off new runs
enabled_connectors = [
connector for connector in connectors if not connector.disabled
]
for connector in enabled_connectors:
for association in connector.credentials:
credential = association.credential

last_successful_attempt = get_last_successful_attempt(
connector.id, credential.id, db_session
)
if not should_create_new_indexing(
connector, last_successful_attempt, db_session
):
last_attempt = get_last_attempt(connector.id, credential.id, db_session)
if not should_create_new_indexing(connector, last_attempt, db_session):
continue
create_index_attempt(connector.id, credential.id, db_session)

Expand Down Expand Up @@ -199,6 +207,17 @@ def run_indexing_jobs(db_session: Session) -> None:
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
update_docs_indexed(
db_session=db_session,
index_attempt=attempt,
num_docs_indexed=document_count,
)

# check if connector is disabled mid run and stop if so
db_session.refresh(db_connector)
if db_connector.disabled:
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")

mark_attempt_succeeded(attempt, db_session)
update_connector_credential_pair(
Expand Down
1 change: 1 addition & 0 deletions backend/danswer/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from danswer.db.connector import fetch_connector_by_id
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.server.models import StatusResponse
Expand Down
46 changes: 44 additions & 2 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from collections.abc import Sequence

from sqlalchemy import and_
from sqlalchemy import ColumnElement
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy.orm import Session

from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.server.models import ConnectorCredentialPairIdentifier
from danswer.utils.logger import setup_logger


Expand Down Expand Up @@ -52,6 +59,7 @@ def mark_attempt_in_progress(
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.IN_PROGRESS
index_attempt.time_started = index_attempt.time_started or func.now()
db_session.add(index_attempt)
db_session.commit()

Expand All @@ -74,21 +82,55 @@ def mark_attempt_failed(
db_session.commit()


def get_last_successful_attempt(
def update_docs_indexed(
db_session: Session, index_attempt: IndexAttempt, num_docs_indexed: int
) -> None:
index_attempt.num_docs_indexed = num_docs_indexed
db_session.add(index_attempt)
db_session.commit()


def get_last_attempt(
connector_id: int,
credential_id: int,
db_session: Session,
) -> IndexAttempt | None:
stmt = select(IndexAttempt)
stmt = stmt.where(IndexAttempt.connector_id == connector_id)
stmt = stmt.where(IndexAttempt.credential_id == credential_id)
stmt = stmt.where(IndexAttempt.status == IndexingStatus.SUCCESS)
# Note, the below is using time_created instead of time_updated
stmt = stmt.order_by(desc(IndexAttempt.time_created))

return db_session.execute(stmt).scalars().first()


def get_latest_index_attempts(
connector_credential_pair_identifiers: list[ConnectorCredentialPairIdentifier],
db_session: Session,
) -> Sequence[IndexAttempt]:
ids_stmt = select(
IndexAttempt.id, func.max(IndexAttempt.time_created).label("max_updated_at")
)

where_stmts: list[ColumnElement] = []
for connector_credential_pair_identifier in connector_credential_pair_identifiers:
where_stmts.append(
and_(
IndexAttempt.connector_id
== connector_credential_pair_identifier.connector_id,
IndexAttempt.credential_id
== connector_credential_pair_identifier.credential_id,
)
)
if where_stmts:
ids_stmt = ids_stmt.where(or_(*where_stmts))
ids_stmt = ids_stmt.group_by(IndexAttempt.id)
ids_subqery = ids_stmt.subquery()

stmt = select(IndexAttempt).join(ids_subqery, ids_subqery.c.id == IndexAttempt.id)
return db_session.execute(stmt).scalars().all()


def delete_index_attempts(
connector_id: int,
credential_id: int,
Expand Down
6 changes: 6 additions & 0 deletions backend/danswer/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,19 @@ class IndexAttempt(Base):
nullable=True,
)
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
num_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
error_msg: Mapped[str | None] = mapped_column(
String(), default=None
) # only filled if status = "failed"
time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
# when the actual indexing run began
# NOTE: will use the api_server clock rather than DB server clock
time_started: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), default=None
)
time_updated: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
Expand Down
28 changes: 28 additions & 0 deletions backend/danswer/server/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
from danswer.db.engine import get_session
from danswer.db.engine import get_sqlalchemy_async_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.direct_qa import check_model_api_key_is_valid
Expand All @@ -74,6 +76,7 @@
from danswer.server.models import FileUploadResponse
from danswer.server.models import GDriveCallback
from danswer.server.models import GoogleAppCredentials
from danswer.server.models import IndexAttemptSnapshot
from danswer.server.models import ObjectCreationIdResponse
from danswer.server.models import RunConnectorRequest
from danswer.server.models import StatusResponse
Expand Down Expand Up @@ -190,6 +193,20 @@ def get_connector_indexing_status(

# TODO: make this one query
cc_pairs = get_connector_credential_pairs(db_session)
latest_index_attempts = get_latest_index_attempts(
db_session=db_session,
connector_credential_pair_identifiers=[
ConnectorCredentialPairIdentifier(
connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id
)
for cc_pair in cc_pairs
],
)
cc_pair_to_latest_index_attempt = {
(index_attempt.connector_id, index_attempt.credential_id): index_attempt
for index_attempt in latest_index_attempts
}

deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = {
cc_pair.connector.id: [] for cc_pair in cc_pairs
}
Expand All @@ -205,6 +222,9 @@ def get_connector_indexing_status(
for cc_pair in cc_pairs:
connector = cc_pair.connector
credential = cc_pair.credential
latest_index_attempt = cc_pair_to_latest_index_attempt.get(
(connector.id, credential.id)
)
deletion_attemts = deletion_attempts_by_connector.get(connector.id, [])
indexing_statuses.append(
ConnectorIndexingStatus(
Expand All @@ -215,6 +235,14 @@ def get_connector_indexing_status(
last_status=cc_pair.last_attempt_status,
last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair.total_docs_indexed,
error_msg=latest_index_attempt.error_msg
if latest_index_attempt
else None,
latest_index_attempt=IndexAttemptSnapshot.from_index_attempt_db_model(
latest_index_attempt
)
if latest_index_attempt
else None,
deletion_attempts=[
DeletionAttemptSnapshot.from_deletion_attempt_db_model(
deletion_attempt
Expand Down
25 changes: 25 additions & 0 deletions backend/danswer/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from danswer.db.models import Credential
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
from danswer.search.models import QueryFlow
from danswer.search.models import SearchType
Expand Down Expand Up @@ -124,6 +125,28 @@ class IndexAttemptRequest(BaseModel):
connector_specific_config: dict[str, Any]


class IndexAttemptSnapshot(BaseModel):
status: IndexingStatus | None
num_docs_indexed: int
error_msg: str | None
time_started: str | None
time_updated: str

@classmethod
def from_index_attempt_db_model(
cls, index_attempt: IndexAttempt
) -> "IndexAttemptSnapshot":
return IndexAttemptSnapshot(
status=index_attempt.status,
num_docs_indexed=index_attempt.num_docs_indexed or 0,
error_msg=index_attempt.error_msg,
time_started=index_attempt.time_started.isoformat()
if index_attempt.time_started
else None,
time_updated=index_attempt.time_updated.isoformat(),
)


class DeletionAttemptSnapshot(BaseModel):
connector_id: int
status: DeletionStatus
Expand Down Expand Up @@ -215,6 +238,8 @@ class ConnectorIndexingStatus(BaseModel):
last_status: IndexingStatus | None
last_success: datetime | None
docs_indexed: int
error_msg: str | None
latest_index_attempt: IndexAttemptSnapshot | None
deletion_attempts: list[DeletionAttemptSnapshot]
is_deletable: bool

Expand Down
Loading
Loading