Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Drive Improvements #3057

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 178 additions & 111 deletions backend/danswer/connectors/google_drive/connector.py

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions backend/danswer/connectors/google_drive/doc_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import IGNORE_FOR_QA
from danswer.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
from danswer.connectors.google_drive.constants import DRIVE_SHORTCUT_TYPE
from danswer.connectors.google_drive.constants import UNSUPPORTED_FILE_TYPE_CONTENT
from danswer.connectors.google_drive.models import GDriveMimeType
Expand All @@ -16,6 +17,7 @@
from danswer.connectors.google_utils.resources import GoogleDriveService
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.models import SlimDocument
from danswer.file_processing.extract_file_text import docx_to_text
from danswer.file_processing.extract_file_text import pptx_to_text
from danswer.file_processing.extract_file_text import read_pdf_file
Expand All @@ -25,6 +27,7 @@

logger = setup_logger()


# these errors don't represent a failure in the connector, but simply files
# that can't / shouldn't be indexed
ERRORS_TO_CONTINUE_ON = [
Expand Down Expand Up @@ -120,6 +123,10 @@ def convert_drive_item_to_document(
if file.get("mimeType") == DRIVE_SHORTCUT_TYPE:
logger.info("Ignoring Drive Shortcut Filetype")
return None
# Skip files that are folders
if file.get("mimeType") == DRIVE_FOLDER_TYPE:
logger.info("Ignoring Drive Folder Filetype")
return None

sections: list[Section] = []

Expand All @@ -133,7 +140,6 @@ def convert_drive_item_to_document(
f"Ran into exception '{e}' when pulling sections from Google Doc '{file['name']}'."
" Falling back to basic extraction."
)

# NOTE: this will run for either (1) the above failed or (2) the file is not a Google Doc
if not sections:
try:
Expand All @@ -150,7 +156,6 @@ def convert_drive_item_to_document(
return None

raise

if not sections:
return None

Expand All @@ -173,3 +178,20 @@ def convert_drive_item_to_document(

logger.exception("Ran into exception when pulling a file from Google Drive")
return None


def build_slim_document(file: GoogleDriveFileType) -> SlimDocument | None:
# Skip files that are folders or shortcuts
if file.get("mimeType") in [DRIVE_FOLDER_TYPE, DRIVE_SHORTCUT_TYPE]:
return None

return SlimDocument(
id=file["webViewLink"],
perm_sync_data={
"doc_id": file.get("id"),
"permissions": file.get("permissions", []),
"permission_ids": file.get("permissionIds", []),
"name": file.get("name"),
"owner_email": file.get("owners", [{}])[0].get("emailAddress"),
},
)
109 changes: 66 additions & 43 deletions backend/danswer/connectors/google_drive/file_retrieval.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Callable
from collections.abc import Iterator
from datetime import datetime
from typing import Any

from googleapiclient.discovery import Resource # type: ignore

Expand Down Expand Up @@ -41,7 +42,6 @@ def _generate_time_range_filter(
def _get_folders_in_parent(
service: Resource,
parent_id: str | None = None,
personal_drive: bool = False,
) -> Iterator[GoogleDriveFileType]:
# Follow shortcuts to folders
query = f"(mimeType = '{DRIVE_FOLDER_TYPE}' or mimeType = '{DRIVE_SHORTCUT_TYPE}')"
Expand All @@ -53,9 +53,10 @@ def _get_folders_in_parent(
for file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
corpora="user" if personal_drive else "allDrives",
supportsAllDrives=not personal_drive,
includeItemsFromAllDrives=not personal_drive,
continue_on_404_or_403=True,
corpora="allDrives",
supportsAllDrives=True,
includeItemsFromAllDrives=True,
fields=FOLDER_FIELDS,
q=query,
):
Expand All @@ -65,7 +66,6 @@ def _get_folders_in_parent(
def _get_files_in_parent(
service: Resource,
parent_id: str,
personal_drive: bool,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
is_slim: bool = False,
Expand All @@ -77,9 +77,10 @@ def _get_files_in_parent(
for file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
corpora="user" if personal_drive else "allDrives",
supportsAllDrives=not personal_drive,
includeItemsFromAllDrives=not personal_drive,
continue_on_404_or_403=True,
corpora="allDrives",
supportsAllDrives=True,
includeItemsFromAllDrives=True,
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
q=query,
):
Expand All @@ -89,7 +90,6 @@ def _get_files_in_parent(
def crawl_folders_for_files(
service: Resource,
parent_id: str,
personal_drive: bool,
traversed_parent_ids: set[str],
update_traversed_ids_func: Callable[[str], None],
start: SecondsSinceUnixEpoch | None = None,
Expand All @@ -99,29 +99,30 @@ def crawl_folders_for_files(
This function starts crawling from any folder. It is slower though.
"""
if parent_id in traversed_parent_ids:
print(f"Skipping subfolder since already traversed: {parent_id}")
logger.info(f"Skipping subfolder since already traversed: {parent_id}")
return

update_traversed_ids_func(parent_id)

yield from _get_files_in_parent(
found_files = False
for file in _get_files_in_parent(
service=service,
personal_drive=personal_drive,
start=start,
end=end,
parent_id=parent_id,
)
):
found_files = True
yield file

if found_files:
update_traversed_ids_func(parent_id)

for subfolder in _get_folders_in_parent(
service=service,
parent_id=parent_id,
personal_drive=personal_drive,
):
logger.info("Fetching all files in subfolder: " + subfolder["name"])
yield from crawl_folders_for_files(
service=service,
parent_id=subfolder["id"],
personal_drive=personal_drive,
traversed_parent_ids=traversed_parent_ids,
update_traversed_ids_func=update_traversed_ids_func,
start=start,
Expand All @@ -133,63 +134,85 @@ def get_files_in_shared_drive(
service: Resource,
drive_id: str,
is_slim: bool = False,
cache_folders: bool = True,
update_traversed_ids_func: Callable[[str], None] = lambda _: None,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[GoogleDriveFileType]:
# If we know we are going to folder crawl later, we can cache the folders here
if cache_folders:
# Get all folders being queried and add them to the traversed set
query = f"mimeType = '{DRIVE_FOLDER_TYPE}'"
query += " and trashed = false"
for file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
corpora="drive",
driveId=drive_id,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
fields="nextPageToken, files(id)",
q=query,
):
update_traversed_ids_func(file["id"])
# Get all folders being queried and add them to the traversed set
query = f"mimeType = '{DRIVE_FOLDER_TYPE}'"
query += " and trashed = false"
found_folders = False
for file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
continue_on_404_or_403=True,
corpora="drive",
driveId=drive_id,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
fields="nextPageToken, files(id)",
q=query,
):
update_traversed_ids_func(file["id"])
found_folders = True
if found_folders:
update_traversed_ids_func(drive_id)

# Get all files in the shared drive
query = f"mimeType != '{DRIVE_FOLDER_TYPE}'"
query += " and trashed = false"
query += _generate_time_range_filter(start, end)
for file in execute_paginated_retrieval(
yield from execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
continue_on_404_or_403=True,
corpora="drive",
driveId=drive_id,
supportsAllDrives=True,
includeItemsFromAllDrives=True,
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
q=query,
):
yield file
)


def get_files_in_my_drive(
service: Resource,
email: str,
def get_all_files_in_my_drive(
service: Any,
update_traversed_ids_func: Callable,
is_slim: bool = False,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[GoogleDriveFileType]:
query = f"mimeType != '{DRIVE_FOLDER_TYPE}' and '{email}' in owners"
query += " and trashed = false"
query += _generate_time_range_filter(start, end)
# If we know we are going to folder crawl later, we can cache the folders here
# Get all folders being queried and add them to the traversed set
query = "trashed = false and 'me' in owners"
found_folders = False
for file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
corpora="user",
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
q=query,
):
yield file
update_traversed_ids_func(file["id"])
found_folders = True
if found_folders:
update_traversed_ids_func(get_root_folder_id(service))

# Then get the files
query = "trashed = false and 'me' in owners"
query += _generate_time_range_filter(start, end)
fields = "files(id, name, mimeType, webViewLink, modifiedTime, createdTime)"
if not is_slim:
fields += ", files(permissions, permissionIds, owners)"

yield from execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
corpora="user",
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
q=query,
)


# Just in case we need to get the root folder id
Expand Down
24 changes: 22 additions & 2 deletions backend/danswer/connectors/google_utils/google_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

def _execute_with_retry(request: Any) -> Any:
max_attempts = 10
attempt = 0
attempt = 1

while attempt < max_attempts:
# Note for reasons unknown, the Google API will sometimes return a 429
Expand Down Expand Up @@ -82,6 +82,7 @@ def _execute_with_retry(request: Any) -> Any:
def execute_paginated_retrieval(
retrieval_function: Callable,
list_key: str,
continue_on_404_or_403: bool = False,
**kwargs: Any,
) -> Iterator[GoogleDriveFileType]:
"""Execute a paginated retrieval from Google Drive API
Expand All @@ -95,7 +96,26 @@ def execute_paginated_retrieval(
if next_page_token:
request_kwargs["pageToken"] = next_page_token

results = add_retries(lambda: retrieval_function(**request_kwargs).execute())()
try:
results = retrieval_function(**request_kwargs).execute()
except HttpError as e:
if e.resp.status >= 500:
results = add_retries(
lambda: retrieval_function(**request_kwargs).execute()
)()
elif e.resp.status == 404 or e.resp.status == 403:
if continue_on_404_or_403:
logger.warning(f"Error executing request: {e}")
results = {}
else:
raise e
elif e.resp.status == 429:
results = _execute_with_retry(
lambda: retrieval_function(**request_kwargs).execute()
)
else:
logger.exception("Error executing request:")
raise e

next_page_token = results.get("nextPageToken")
for item in results.get(list_key, []):
Expand Down
Loading
Loading