Skip to content

Commit

Permalink
Fix: enable non .txt file format uploads when using direct upload (#192)
Browse files Browse the repository at this point in the history
* fix: upload non txt files when doing direct upload

* fix: upload non txt files when doing direct upload

* docstrings

* docstrings+lint

* docstrings + lint

* fix tests

* docstrings

* fix tests

* fix tests

* fix tests

* move try.py

* rename try.py

* don't convert meta to bytes before uploading

* add pdf cases for in memory integration tests

* add pdf cases for in memory integration tests

* format

* format

* tests

* fix test data
  • Loading branch information
rjanjua authored May 17, 2024
1 parent 9ddb28e commit 35fdb40
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 91 deletions.
2 changes: 1 addition & 1 deletion deepset_cloud_sdk/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""This file defines the package version."""
# Will be automatically overridden during the release process
# It's okay if this is outdated in the repo. We will use the tag from the release as the version.
__version__ = "0.0.35"
__version__ = "1.0.2"
18 changes: 11 additions & 7 deletions deepset_cloud_sdk/_api/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI
from deepset_cloud_sdk._api.upload_sessions import WriteMode
from deepset_cloud_sdk._utils.constants import SUPPORTED_TYPE_SUFFIXES
from deepset_cloud_sdk._utils.datetime import from_isoformat

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -204,15 +205,15 @@ async def direct_upload_path(
file_id: UUID = UUID(response.json()["file_id"])
return file_id

async def direct_upload_text(
async def direct_upload_in_memory(
self,
workspace_name: str,
text: str,
content: Union[bytes, str],
file_name: str,
meta: Optional[Dict[str, Any]] = None,
write_mode: WriteMode = WriteMode.KEEP,
) -> UUID:
"""Directly upload text to deepset Cloud.
"""Directly upload files to deepset Cloud.
:param workspace_name: Name of the workspace to use.
:param text: File text to upload.
Expand All @@ -225,17 +226,20 @@ async def direct_upload_text(
FAIL - fails to upload the file with the same name.
:return: ID of the uploaded file.
"""
if not file_name.endswith(".txt"):
file_name_suffix = f".{file_name.split('.')[1]}"
if file_name_suffix not in SUPPORTED_TYPE_SUFFIXES:
raise NotMatchingFileTypeException(
f"File name {file_name} is not a textfile. Please use '.txt' for text uploads."
f"File name {file_name} is not a supported file type. Please use one of {'` '.join(SUPPORTED_TYPE_SUFFIXES)} for text uploads."
)

response = await self._deepset_cloud_api.post(
workspace_name,
"files",
data={"text": text, "meta": json.dumps(meta)},
params={"write_mode": write_mode.value, "file_name": file_name},
files={"file": (file_name, content)},
data={"meta": json.dumps(meta)},
params={"write_mode": write_mode.value},
)

if response.status_code != codes.CREATED or response.json().get("file_id") is None:
raise FailedToUploadFileException(
f"Failed to upload file with status code {response.status_code}. response was: {response.text}"
Expand Down
26 changes: 13 additions & 13 deletions deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""Module for upload-related S3 operations."""
import asyncio
import json
import os
import re
from dataclasses import dataclass
from http import HTTPStatus
from pathlib import Path
from typing import Any, Coroutine, List, Optional, Union
from typing import Any, Coroutine, List, Optional, Sequence, Union

import aiofiles
import aiohttp
Expand All @@ -19,7 +18,7 @@
AWSPrefixedRequestConfig,
UploadSession,
)
from deepset_cloud_sdk.models import DeepsetCloudFile
from deepset_cloud_sdk.models import DeepsetCloudFileBase

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -181,14 +180,14 @@ async def upload_from_file(
)
return S3UploadResult(file_name=file_name, success=False, exception=exception)

async def upload_from_string(
async def upload_from_memory(
self,
file_name: str,
upload_session: UploadSession,
content: str,
content: Union[bytes, str],
client_session: aiohttp.ClientSession,
) -> S3UploadResult:
"""Upload text to the prefixed S3 namespace.
"""Upload content to the prefixed S3 namespace.
:param file_name: Name of the file.
:param upload_session: UploadSession to associate the upload with.
Expand Down Expand Up @@ -267,13 +266,13 @@ async def upload_files_from_paths(
result_summary = await self._process_results(tasks, show_progress=show_progress)
return result_summary

async def upload_texts(
self, upload_session: UploadSession, files: List[DeepsetCloudFile], show_progress: bool = True
async def upload_in_memory(
self, upload_session: UploadSession, files: Sequence[DeepsetCloudFileBase], show_progress: bool = True
) -> S3UploadSummary:
"""Upload a set of texts to the prefixed S3 namespace given a list of paths.
"""Upload a set of files to the prefixed S3 namespace given a list of paths.
:param upload_session: UploadSession to associate the upload with.
:param files: A list of DeepsetCloudFiles to upload.
:param files: A list of DeepsetCloudFileBase to upload.
:param show_progress: Whether to show a progress bar on the upload.
:return: S3UploadSummary object.
"""
Expand All @@ -283,13 +282,14 @@ async def upload_texts(
for file in files:
# raw data
file_name = file.name
tasks.append(self.upload_from_string(file_name, upload_session, file.text, client_session))
tasks.append(self.upload_from_memory(file_name, upload_session, file.content(), client_session))

# meta
if file.meta is not None:
meta_name = f"{file_name}.meta.json"
metadata = json.dumps(file.meta)
tasks.append(self.upload_from_string(meta_name, upload_session, metadata, client_session))
tasks.append(
self.upload_from_memory(meta_name, upload_session, file.meta_as_string(), client_session)
)

result_summary = await self._process_results(tasks, show_progress=show_progress)

Expand Down
39 changes: 27 additions & 12 deletions deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@
from collections import defaultdict
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Set, Tuple, Union
from typing import (
Any,
AsyncGenerator,
Dict,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
)
from uuid import UUID

import structlog
Expand All @@ -31,11 +41,11 @@
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3, S3UploadResult, S3UploadSummary
from deepset_cloud_sdk.models import DeepsetCloudFile
from deepset_cloud_sdk._utils.constants import SUPPORTED_TYPE_SUFFIXES
from deepset_cloud_sdk.models import DeepsetCloudFileBase

logger = structlog.get_logger(__name__)

SUPPORTED_TYPE_SUFFIXES = [".csv", ".docx", ".html", ".json", ".md", ".txt", ".pdf", ".pptx", ".xlsx", ".xml"]
META_SUFFIX = ".meta.json"
DIRECT_UPLOAD_THRESHOLD = 20

Expand Down Expand Up @@ -148,13 +158,18 @@ async def _wrapped_direct_upload_path(
logger.error("Failed uploading file.", file_path=file_path, error=error)
return S3UploadResult(file_name=file_path.name, success=False, exception=error)

async def _wrapped_direct_upload_text(
self, workspace_name: str, text: str, file_name: str, meta: Dict[str, Any], write_mode: WriteMode
async def _wrapped_direct_upload_in_memory(
self,
workspace_name: str,
content: Union[str, bytes],
file_name: str,
meta: Dict[str, Any],
write_mode: WriteMode,
) -> S3UploadResult:
try:
await self._files.direct_upload_text(
await self._files.direct_upload_in_memory(
workspace_name=workspace_name,
text=text,
content=content,
meta=meta,
file_name=file_name,
write_mode=write_mode,
Expand Down Expand Up @@ -543,10 +558,10 @@ async def download(
if pbar is not None:
pbar.close()

async def upload_texts(
async def upload_in_memory(
self,
workspace_name: str,
files: List[DeepsetCloudFile],
files: Sequence[DeepsetCloudFileBase],
write_mode: WriteMode = WriteMode.KEEP,
blocking: bool = True,
timeout_s: Optional[int] = None,
Expand Down Expand Up @@ -578,11 +593,11 @@ async def upload_texts(
_coroutines = []
for file in files:
_coroutines.append(
self._wrapped_direct_upload_text(
self._wrapped_direct_upload_in_memory(
workspace_name=workspace_name,
file_name=file.name,
meta=file.meta or {},
text=file.text,
content=file.content(),
write_mode=write_mode,
)
)
Expand All @@ -601,7 +616,7 @@ async def upload_texts(

# create session to upload files to
async with self._create_upload_session(workspace_name=workspace_name, write_mode=write_mode) as upload_session:
upload_summary = await self._s3.upload_texts(
upload_summary = await self._s3.upload_in_memory(
upload_session=upload_session, files=files, show_progress=show_progress
)

Expand Down
1 change: 1 addition & 0 deletions deepset_cloud_sdk/_utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SUPPORTED_TYPE_SUFFIXES = [".csv", ".docx", ".html", ".json", ".md", ".txt", ".pdf", ".pptx", ".xlsx", ".xml"]
84 changes: 77 additions & 7 deletions deepset_cloud_sdk/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""General data classes for deepset Cloud SDK."""
import json
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union
from uuid import UUID


Expand All @@ -13,10 +15,78 @@ class UserInfo:
family_name: str


@dataclass
class DeepsetCloudFile:
"""Data class for files in deepset Cloud."""
class DeepsetCloudFileBase: # pylint: disable=too-few-public-methods
"""Base class for deepset Cloud files."""

def __init__(self, name: str, meta: Optional[Dict[str, Any]] = None):
"""
Initialize DeepsetCloudFileBase.
:param name: The file name
:param meta: The file's metadata
"""
self.name = name
self.meta = meta

@abstractmethod
def content(self) -> Union[str, bytes]:
"""Return content."""
raise NotImplementedError

def meta_as_string(self) -> str:
"""Return metadata as a string."""
if self.meta:
return json.dumps(self.meta)

return json.dumps({})


class DeepsetCloudFile(DeepsetCloudFileBase): # pylint: disable=too-few-public-methods
"""Data class for text files in deepset Cloud."""

def __init__(self, text: str, name: str, meta: Optional[Dict[str, Any]] = None):
"""
Initialize DeepsetCloudFileBase.
:param name: The file name
:param text: The text content of the file
:param meta: The file's metadata
"""
super().__init__(name, meta)
self.text = text

def content(self) -> str:
"""
Return the content of the file.
:return: The text of the file.
"""
return self.text


# Didn't want to cause breaking changes in the DeepsetCloudFile class, though it
# is technically the same as the below, the naming of the text field will be confusing
# for users that are uploading anything other than text.


class DeepsetCloudFileBytes(DeepsetCloudFileBase): # pylint: disable=too-few-public-methods
"""Data class for uploading files of any valid type in deepset Cloud."""

def __init__(self, file_bytes: bytes, name: str, meta: Optional[Dict[str, Any]] = None):
"""
Initialize DeepsetCloudFileBase.
:param name: The file name
:param file_bytes: The content of the file represented in bytes
:param meta: The file's metadata
"""
super().__init__(name, meta)
self.file_bytes = file_bytes

def content(self) -> bytes:
"""
Return the content of the file in bytes.
text: str
name: str
meta: Optional[Dict[str, Any]] = None
:return: The content of the file in bytes.
"""
return self.file_bytes
42 changes: 40 additions & 2 deletions deepset_cloud_sdk/workflows/async_client/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3UploadSummary
from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService
from deepset_cloud_sdk._service.files_service import FilesService
from deepset_cloud_sdk.models import DeepsetCloudFile, DeepsetCloudFileBytes


def _get_config(api_key: Optional[str] = None, api_url: Optional[str] = None) -> CommonConfig:
Expand Down Expand Up @@ -232,7 +233,44 @@ async def upload_texts(
:param show_progress: Shows the upload progress.
"""
async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service:
return await file_service.upload_texts(
return await file_service.upload_in_memory(
workspace_name=workspace_name,
files=files,
write_mode=write_mode,
blocking=blocking,
timeout_s=timeout_s,
show_progress=show_progress,
)


async def upload_bytes(
files: List[DeepsetCloudFileBytes],
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
write_mode: WriteMode = WriteMode.KEEP,
blocking: bool = True,
timeout_s: Optional[int] = None,
show_progress: bool = True,
) -> S3UploadSummary:
"""Upload raw texts to deepset Cloud.
:param files: List of DeepsetCloudFiles to upload.
:param api_key: deepset Cloud API key to use for authentication.
:param api_url: API URL to use for authentication.
:param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.
:param write_mode: Specifies what to do when a file with the same name already exists in the workspace.
Possible options are:
KEEP - uploads the file with the same name and keeps both files in the workspace.
OVERWRITE - overwrites the file that is in the workspace.
FAIL - fails to upload the file with the same name.
:param blocking: Whether to wait for the files to be listed and displayed in deepset Cloud.
This may take a couple of minutes.
:param timeout_s: Timeout in seconds for the `blocking` parameter.
:param show_progress: Shows the upload progress.
"""
async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service:
return await file_service.upload_in_memory(
workspace_name=workspace_name,
files=files,
write_mode=write_mode,
Expand Down
Loading

0 comments on commit 35fdb40

Please sign in to comment.