Skip to content

Commit

Permalink
Backport changes from v4 to v3 (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito authored Nov 15, 2024
1 parent 4e470de commit f19fba4
Show file tree
Hide file tree
Showing 12 changed files with 1,243 additions and 1,114 deletions.
5 changes: 3 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
"visualstudioexptteam.vscodeintellicode",
"ymotongpoo.licenser",
"charliermarsh.ruff",
"ms-python.mypy-type-checker"
"ms-python.mypy-type-checker",
"-ms-python.autopep8"
]
}
},
Expand All @@ -71,4 +72,4 @@
// details can be found here: https://github.com/devcontainers/features/tree/main/src/docker-outside-of-docker
"ghcr.io/devcontainers/features/docker-outside-of-docker:1": {}
}
}
}
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ repos:
- id: no-commit-to-branch
args: [--branch, dev, --branch, int, --branch, main]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.2
rev: v0.7.4
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix, --exclude, scripts]
- id: ruff-format
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.2
rev: v1.13.0
hooks:
- id: mypy
args: [--no-warn-unused-ignores]
2 changes: 1 addition & 1 deletion .pyproject_generation/pyproject_custom.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "hexkit"
version = "3.6.0"
version = "3.7.0"
description = "A Toolkit for Building Microservices using the Hexagonal Architecture"
requires-python = ">=3.9"
classifiers = [
Expand Down
1,352 changes: 679 additions & 673 deletions lock/requirements-dev.txt

Large diffs are not rendered by default.

767 changes: 389 additions & 378 deletions lock/requirements.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "hexkit"
version = "3.6.0"
version = "3.7.0"
description = "A Toolkit for Building Microservices using the Hexagonal Architecture"
dependencies = [
"pydantic >=2, <3",
Expand Down
31 changes: 31 additions & 0 deletions src/hexkit/protocols/objstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async def get_part_upload_url(
object_id: str,
part_number: int,
expires_after: int = 3600,
part_md5: Optional[str] = None,
) -> str:
"""Given a id of an instantiated multipart upload along with the corresponding
bucket and object ID, it returns a presigned URL for uploading a file part with the
Expand All @@ -137,6 +138,7 @@ async def get_part_upload_url(
object_id=object_id,
part_number=part_number,
expires_after=expires_after,
part_md5=part_md5,
)

async def abort_multipart_upload(
Expand Down Expand Up @@ -207,6 +209,12 @@ async def does_object_exist(
bucket_id=bucket_id, object_id=object_id, object_md5sum=object_md5sum
)

async def get_object_etag(self, *, bucket_id: str, object_id: str) -> str:
"""Returns the etag of an object."""
self._validate_bucket_id(bucket_id)
self._validate_object_id(object_id)
return await self._get_object_etag(bucket_id=bucket_id, object_id=object_id)

async def get_object_size(self, *, bucket_id: str, object_id: str) -> int:
"""Returns the size of an object in bytes."""
self._validate_bucket_id(bucket_id)
Expand All @@ -220,9 +228,15 @@ async def copy_object(
source_object_id: str,
dest_bucket_id: str,
dest_object_id: str,
abort_failed: bool = True,
) -> None:
"""Copy an object from one bucket (`source_bucket_id` and `source_object_id`) to
another bucket (`dest_bucket_id` and `dest_object_id`).
If `abort_failed` is set to true (default), a failed copy operation tries to
abort the ongoing multipart upload it created (if using multipart mode).
This only works reliably as long as there are no other ongoing multipart operations for
the same destination bucket and object ID, in which case this should be set to false.
"""
self._validate_bucket_id(source_bucket_id)
self._validate_object_id(source_object_id)
Expand All @@ -233,6 +247,7 @@ async def copy_object(
source_object_id=source_object_id,
dest_bucket_id=dest_bucket_id,
dest_object_id=dest_object_id,
abort_failed=abort_failed,
)

async def delete_object(self, *, bucket_id: str, object_id: str) -> None:
Expand Down Expand Up @@ -336,6 +351,7 @@ async def _get_part_upload_url(
object_id: str,
part_number: int,
expires_after: int = 3600,
part_md5: Optional[str] = None,
) -> str:
"""
Given a id of an instantiated multipart upload along with the corresponding
Expand Down Expand Up @@ -402,6 +418,15 @@ async def _get_object_download_url(
"""
...

@abstractmethod
async def _get_object_etag(self, *, bucket_id: str, object_id: str) -> str:
"""Return the etag of an object.
*To be implemented by the provider. Input validation is done outside of this
method.*
"""
...

@abstractmethod
async def _get_object_size(self, *, bucket_id: str, object_id: str) -> int:
"""
Expand Down Expand Up @@ -435,11 +460,17 @@ async def _copy_object(
source_object_id: str,
dest_bucket_id: str,
dest_object_id: str,
abort_failed: bool = True,
) -> None:
"""
Copy an object from one bucket (`source_bucket_id` and `source_object_id`) to
another bucket (`dest_bucket_id` and `dest_object_id`).
If `abort_failed` is set to true (default), a failed copy operation tries to
abort the ongoing multipart upload it created (if using multipart mode).
This only works reliably as long as there are no other ongoing multipart operations for
the same destination bucket and object ID, in which case this should be set to false.
*To be implemented by the provider. Input validation is done outside of this
method.*
"""
Expand Down
2 changes: 1 addition & 1 deletion src/hexkit/providers/akafka/testcontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def create_file(self, content: bytes, path: str) -> None:
with tarfile.TarFile(fileobj=archive, mode="w") as tar:
tarinfo = tarfile.TarInfo(name=path)
tarinfo.size = len(content)
tarinfo.mtime = time.time() # type: ignore
tarinfo.mtime = time.time()
tar.addfile(tarinfo, BytesIO(content))
archive.seek(0)
self.get_wrapped_container().put_archive("/", archive)
100 changes: 75 additions & 25 deletions src/hexkit/providers/s3/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,27 +223,33 @@ def _translate_s3_client_errors(

async def _does_bucket_exist(self, bucket_id: str) -> bool:
"""Check whether a bucket with the specified ID (`bucket_id`) exists.
Return `True` if it exists and `False` otherwise.
Returns `True` if it exists and `False` otherwise.
Note that this method does not need to have the permission to list buckets.
"""
try:
bucket_list = await asyncio.to_thread(self._client.list_buckets)
await asyncio.to_thread(self._client.head_bucket, Bucket=bucket_id)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] == "404":
return False
raise self._translate_s3_client_errors(
error, bucket_id=bucket_id
) from error

return any(bucket["Name"] == bucket_id for bucket in bucket_list["Buckets"])
return True

async def _assert_bucket_exists(self, bucket_id: str) -> None:
"""Checks if the bucket with specified ID (`bucket_id`) exists and throws an
BucketNotFoundError otherwise.
"""Assert that the bucket with the specified ID (`bucket_id`) exists.
If it does not exist, a BucketNotFoundError is raised.
"""
if not await self.does_bucket_exist(bucket_id):
raise self.BucketNotFoundError(bucket_id=bucket_id)

async def _assert_bucket_not_exists(self, bucket_id: str) -> None:
"""Checks if the bucket with specified ID (`bucket_id`) exists. If so, it throws
an BucketAlreadyExistsError.
"""Assert that the bucket with the specified ID (`bucket_id`) does not exist.
If it exists, a BucketAlreadyExistsError is raised.
"""
if await self.does_bucket_exist(bucket_id):
raise self.BucketAlreadyExistsError(bucket_id=bucket_id)
Expand Down Expand Up @@ -274,7 +280,7 @@ async def _delete_bucket(
await self._assert_bucket_exists(bucket_id)

try:
bucket = self._resource.Bucket(bucket_id)
bucket = self._resource.Bucket(bucket_id) # pyright: ignore
content = await asyncio.to_thread(bucket.objects.all)
if delete_content:
await asyncio.to_thread(content.delete)
Expand All @@ -289,7 +295,7 @@ async def _list_all_object_ids(self, *, bucket_id: str) -> list[str]:
await self._assert_bucket_exists(bucket_id)

try:
bucket = self._resource.Bucket(bucket_id)
bucket = self._resource.Bucket(bucket_id) # pyright: ignore
content = await asyncio.to_thread(bucket.objects.all)
return [object_summary.key for object_summary in content]
except botocore.exceptions.ClientError as error:
Expand All @@ -306,17 +312,18 @@ async def _does_object_exist(
Return `True` if checks succeed and `False` otherwise.
"""
if object_md5sum is not None:
raise NotImplementedError("Md5 checking is not yet implemented.")
raise NotImplementedError("MD5 checking is not yet implemented.")

try:
_ = await asyncio.to_thread(
self._client.head_object,
Bucket=bucket_id,
Key=object_id,
await asyncio.to_thread(
self._client.head_object, Bucket=bucket_id, Key=object_id
)
except botocore.exceptions.ClientError:
return False

except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] == "404":
return False
raise self._translate_s3_client_errors(
error, bucket_id=bucket_id, object_id=object_id
) from error
return True

async def _assert_object_exists(self, *, bucket_id: str, object_id: str) -> None:
Expand Down Expand Up @@ -451,7 +458,7 @@ async def _assert_multipart_upload_exists(
)

async def _init_multipart_upload(self, *, bucket_id: str, object_id: str) -> str:
"""Initiates a mulipart upload procedure. Returns the upload ID."""
"""Initiates a multipart upload procedure. Returns the upload ID."""
await self._assert_no_multipart_upload(bucket_id=bucket_id, object_id=object_id)

try:
Expand All @@ -473,6 +480,7 @@ async def _get_part_upload_url(
object_id: str,
part_number: int,
expires_after: int = 3600,
part_md5: Optional[str] = None,
) -> str:
"""Given a id of an instantiated multipart upload along with the corresponding
bucket and object ID, it returns a presigned URL for uploading a file part with the
Expand All @@ -491,16 +499,20 @@ async def _get_part_upload_url(
upload_id=upload_id, bucket_id=bucket_id, object_id=object_id
)

params = {
"Bucket": bucket_id,
"Key": object_id,
"UploadId": upload_id,
"PartNumber": part_number,
}
# add additional parameters if any were passed
if part_md5:
params["ContentMD5"] = part_md5
try:
return await asyncio.to_thread(
self._client.generate_presigned_url,
ClientMethod="upload_part",
Params={
"Bucket": bucket_id,
"Key": object_id,
"UploadId": upload_id,
"PartNumber": part_number,
},
Params=params,
ExpiresIn=expires_after,
)
except botocore.exceptions.ClientError as error:
Expand Down Expand Up @@ -757,6 +769,22 @@ async def _get_object_download_url(

return presigned_url

async def _get_object_etag(self, *, bucket_id: str, object_id: str) -> str:
"""Return the etag of an object."""
await self._assert_object_exists(bucket_id=bucket_id, object_id=object_id)

object_metadata = await self._get_object_metadata(
bucket_id=bucket_id, object_id=object_id
)

if "ETag" not in object_metadata:
raise self.ObjectError(
f"Could not get the etag of the object with ID '{object_id}' in"
+ f" bucket '{bucket_id}'."
)

return object_metadata["ETag"]

async def _get_object_size(self, *, bucket_id: str, object_id: str) -> int:
"""Returns the size of an object in bytes."""
await self._assert_object_exists(bucket_id=bucket_id, object_id=object_id)
Expand Down Expand Up @@ -795,9 +823,15 @@ async def _copy_object(
source_object_id: str,
dest_bucket_id: str,
dest_object_id: str,
abort_failed: bool = True,
) -> None:
"""Copy an object from one bucket (`source_bucket_id` and `source_object_id`) to
another bucket (`dest_bucket_id` and `dest_object_id`).
If `abort_failed` is set to true (default), a failed copy operation tries to
abort the ongoing multipart upload it created (if using multipart mode).
This only works reliably as long as there are no other ongoing multipart operations for
the same destination bucket and object ID, in which case this should be set to false.
"""
file_size = await self._get_object_size(
bucket_id=source_bucket_id, object_id=source_object_id
Expand Down Expand Up @@ -825,6 +859,22 @@ async def _copy_object(
Config=transfer_config,
)
except botocore.exceptions.ClientError as error:
if abort_failed:
# try to find and abort the multipart operation, if multipart copy mode is used
# There should only be one ongoing multipart upload/copy at at a time as long
# as a new object ID is generated for each attempt
try:
upload_ids = await self._list_multipart_upload_for_object(
bucket_id=dest_bucket_id, object_id=dest_object_id
)
if len(upload_ids) == 1:
await self._abort_multipart_upload(
upload_id=upload_ids[0],
bucket_id=dest_bucket_id,
object_id=dest_object_id,
)
except botocore.exceptions.ClientError:
pass
raise self._translate_s3_client_errors(error) from error

async def _delete_object(self, *, bucket_id: str, object_id: str) -> None:
Expand Down
Loading

0 comments on commit f19fba4

Please sign in to comment.