Skip to content

Commit

Permalink
doc sync celery refactor (#3084)
Browse files Browse the repository at this point in the history
* doc_sync is refactored

* maybe this works

* tested to work!

* mypy fixes

* enabled integration tests

* fixed the test

* added external group sync

* testing should work now

* mypy

* confluence doc id fix

* got group sync working

* addressed feedback

* renamed some vars and fixed mypy

* conf fix?

* added wiki handling to confluence connector

* test fixes

* revert google drive connector

* fixed groups
  • Loading branch information
hagen-danswer authored Nov 12, 2024
1 parent 942e47d commit ef9c63b
Show file tree
Hide file tree
Showing 55 changed files with 1,641 additions and 615 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pr-Integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ jobs:
-e SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN} \
-e TEST_WEB_HOSTNAME=test-runner \
danswer/danswer-integration:test \
/app/tests/integration/tests
/app/tests/integration/tests \
/app/tests/integration/connector_job_tests
continue-on-error: true
id: run_tests

Expand Down
4 changes: 2 additions & 2 deletions .vscode/launch.template.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
"--loglevel=INFO",
"--hostname=light@%n",
"-Q",
"vespa_metadata_sync,connector_deletion",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert",
],
"presentation": {
"group": "2",
Expand Down Expand Up @@ -232,7 +232,7 @@
"--loglevel=INFO",
"--hostname=heavy@%n",
"-Q",
"connector_pruning",
"connector_pruning,connector_doc_permissions_sync,connector_external_group_sync",
],
"presentation": {
"group": "2",
Expand Down
30 changes: 30 additions & 0 deletions backend/alembic/versions/2daa494a0851_add_group_sync_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add-group-sync-time
Revision ID: 2daa494a0851
Revises: c0fd6e4da83a
Create Date: 2024-11-11 10:57:22.991157
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "2daa494a0851"
down_revision = "c0fd6e4da83a"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"connector_credential_pair",
sa.Column(
"last_time_external_group_sync",
sa.DateTime(timezone=True),
nullable=True,
),
)


def downgrade() -> None:
op.drop_column("connector_credential_pair", "last_time_external_group_sync")
35 changes: 35 additions & 0 deletions backend/danswer/access/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,41 @@ class ExternalAccess:
is_public: bool


@dataclass(frozen=True)
class DocExternalAccess:
external_access: ExternalAccess
# The document ID
doc_id: str

def to_dict(self) -> dict:
return {
"external_access": {
"external_user_emails": list(self.external_access.external_user_emails),
"external_user_group_ids": list(
self.external_access.external_user_group_ids
),
"is_public": self.external_access.is_public,
},
"doc_id": self.doc_id,
}

@classmethod
def from_dict(cls, data: dict) -> "DocExternalAccess":
external_access = ExternalAccess(
external_user_emails=set(
data["external_access"].get("external_user_emails", [])
),
external_user_group_ids=set(
data["external_access"].get("external_user_group_ids", [])
),
is_public=data["external_access"]["is_public"],
)
return cls(
external_access=external_access,
doc_id=data["doc_id"],
)


@dataclass(frozen=True)
class DocumentAccess(ExternalAccess):
# User emails for Danswer users, None indicates admin
Expand Down
18 changes: 18 additions & 0 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from danswer.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_document_set import RedisDocumentSet
from danswer.redis.redis_pool import get_redis_client
Expand Down Expand Up @@ -136,6 +138,22 @@ def on_task_postrun(
RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r)
return

if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX):
cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
if cc_pair_id is not None:
RedisConnectorPermissionSync.remove_from_taskset(
int(cc_pair_id), task_id, r
)
return

if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX):
cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
if cc_pair_id is not None:
RedisConnectorExternalGroupSync.remove_from_taskset(
int(cc_pair_id), task_id, r
)
return


def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None:
"""The first signal sent on celery worker startup"""
Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,7 @@ def on_setup_logging(
celery_app.autodiscover_tasks(
[
"danswer.background.celery.tasks.pruning",
"danswer.background.celery.tasks.doc_permission_syncing",
"danswer.background.celery.tasks.external_group_syncing",
]
)
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@ def on_setup_logging(
"danswer.background.celery.tasks.shared",
"danswer.background.celery.tasks.vespa",
"danswer.background.celery.tasks.connector_deletion",
"danswer.background.celery.tasks.doc_permission_syncing",
]
)
8 changes: 8 additions & 0 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from danswer.db.engine import SqlEngine
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from danswer.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_connector_stop import RedisConnectorStop
Expand Down Expand Up @@ -134,6 +136,10 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

RedisConnectorStop.reset_all(r)

RedisConnectorPermissionSync.reset_all(r)

RedisConnectorExternalGroupSync.reset_all(r)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -233,6 +239,8 @@ def stop(self, worker: Any) -> None:
"danswer.background.celery.tasks.connector_deletion",
"danswer.background.celery.tasks.indexing",
"danswer.background.celery.tasks.periodic",
"danswer.background.celery.tasks.doc_permission_syncing",
"danswer.background.celery.tasks.external_group_syncing",
"danswer.background.celery.tasks.pruning",
"danswer.background.celery.tasks.shared",
"danswer.background.celery.tasks.vespa",
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/background/celery/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def extract_ids_from_runnable_connector(
callback: RunIndexingCallbackInterface | None = None,
) -> set[str]:
"""
If the PruneConnector hasnt been implemented for the given connector, just pull
If the SlimConnector hasnt been implemented for the given connector, just pull
all docs using the load_from_state and grab out the IDs.
Optionally, a callback can be passed to handle the length of each document batch.
Expand Down
12 changes: 12 additions & 0 deletions backend/danswer/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
"schedule": timedelta(seconds=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-doc-permissions-sync",
"task": "check_for_doc_permissions_sync",
"schedule": timedelta(seconds=30),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-external-group-sync",
"task": "check_for_external_group_sync",
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def try_generate_document_cc_pair_cleanup_tasks(
f"cc_pair={cc_pair_id}"
)

if redis_connector.permissions.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (permissions in progress): "
f"cc_pair={cc_pair_id}"
)

# add tasks to celery and build up the task set to monitor in redis
redis_connector.delete.taskset_clear()

Expand Down
Loading

0 comments on commit ef9c63b

Please sign in to comment.