Skip to content

Commit

Permalink
IFRS - Improve file staging error handling (GSI-1190) (#75)
Browse files Browse the repository at this point in the history
* Remove extra object existence check before copy

* IFRS: Add CopyOperationError and log fatal errors

* IFRS: bump version from 2.1.1 -> 2.1.2

* IFRS: Update readme

* Add missing doc string text

* IFRS: Apply copy operation error handling to register_file too

* Fix docstring in ifrs test
  • Loading branch information
TheByronHimes authored Dec 3, 2024
1 parent 83aaa68 commit 16ecde7
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 26 deletions.
6 changes: 3 additions & 3 deletions services/ifrs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ We recommend using the provided Docker container.

A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/internal-file-registry-service):
```bash
docker pull ghga/internal-file-registry-service:2.1.1
docker pull ghga/internal-file-registry-service:2.1.2
```

Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile):
```bash
# Execute in the repo's root dir:
docker build -t ghga/internal-file-registry-service:2.1.1 .
docker build -t ghga/internal-file-registry-service:2.1.2 .
```

For production-ready deployment, we recommend using Kubernetes, however,
for simple use cases, you could execute the service using docker
on a single server:
```bash
# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/internal-file-registry-service:2.1.1 --help
docker run -p 8080:8080 ghga/internal-file-registry-service:2.1.2 --help
```

If you prefer not to use containers, you may install the service from source:
Expand Down
2 changes: 1 addition & 1 deletion services/ifrs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "ifrs"
version = "2.1.1"
version = "2.1.2"
description = "Internal File Registry Service - This service acts as a registry for the internal location and representation of files."
readme = "README.md"
authors = [
Expand Down
75 changes: 54 additions & 21 deletions services/ifrs/src/ifrs/core/file_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ async def register_file(
When the file content is not present in the storage staging.
ValueError:
When the configuration for the storage alias is not found.
self.CopyOperationError:
When an error occurs while attempting to copy the object to the
permanent storage bucket.
"""
storage_alias = file_without_object_id.storage_alias

Expand Down Expand Up @@ -138,6 +141,7 @@ async def register_file(
**file_without_object_id.model_dump(), object_id=object_id
)

# Make sure the object exists in the source bucket (staging)
if not await object_storage.does_object_exist(
bucket_id=staging_bucket_id, object_id=staging_object_id
):
Expand All @@ -150,13 +154,32 @@ async def register_file(
)
raise content_not_in_staging

await object_storage.copy_object(
source_bucket_id=staging_bucket_id,
source_object_id=staging_object_id,
dest_bucket_id=permanent_bucket_id,
dest_object_id=object_id,
)
# Copy the file from staging to permanent storage
try:
await object_storage.copy_object(
source_bucket_id=staging_bucket_id,
source_object_id=staging_object_id,
dest_bucket_id=permanent_bucket_id,
dest_object_id=object_id,
)
except object_storage.ObjectAlreadyExistsError:
# the content is already where it should go, there is nothing to do
log.info(
"Object corresponding to file ID '%s' is already in permanent storage.",
file_without_object_id.file_id,
)
return
except Exception as exc:
# Irreconcilable object error -- event needs investigation
obj_error = self.CopyOperationError(
file_id=file_without_object_id.file_id,
dest_bucket_id=permanent_bucket_id,
exc_text=str(exc),
)
log.critical(obj_error)
raise obj_error from exc

# Log the registration and publish an event
log.info("Inserting file with file ID '%s'.", file.file_id)
await self._file_metadata_dao.insert(file)

Expand Down Expand Up @@ -192,6 +215,8 @@ async def stage_registered_file(
When encountering inconsistency between the registry (the database) and
the permanent storage. This is an internal service error, which should
not happen, and not the fault of the client.
self.CopyOperationError:
When an error occurs while attempting to copy the object to the outbox.
"""
try:
file = await self._file_metadata_dao.get_by_id(file_id)
Expand Down Expand Up @@ -220,15 +245,7 @@ async def stage_registered_file(
file.storage_alias
)

if await object_storage.does_object_exist(
bucket_id=outbox_bucket_id, object_id=outbox_object_id
):
# the content is already where it should go, there is nothing to do
log.info(
"Object corresponding to file ID '%s' is already in storage.", file_id
)
return

# Make sure the file exists in permanent storage before trying to copy it
if not await object_storage.does_object_exist(
bucket_id=permanent_bucket_id, object_id=file.object_id
):
Expand All @@ -238,12 +255,28 @@ async def stage_registered_file(
log.critical(msg=not_in_storage_error, extra={"file_id": file_id})
raise not_in_storage_error

await object_storage.copy_object(
source_bucket_id=permanent_bucket_id,
source_object_id=file.object_id,
dest_bucket_id=outbox_bucket_id,
dest_object_id=outbox_object_id,
)
# Copy the file from permanent storage bucket to the outbox (download) bucket
try:
await object_storage.copy_object(
source_bucket_id=permanent_bucket_id,
source_object_id=file.object_id,
dest_bucket_id=outbox_bucket_id,
dest_object_id=outbox_object_id,
)
except object_storage.ObjectAlreadyExistsError:
# the content is already where it should go, there is nothing to do
log.info(
"Object corresponding to file ID '%s' is already in the outbox.",
file_id,
)
return
except Exception as exc:
# Irreconcilable object error -- event needs investigation
obj_error = self.CopyOperationError(
file_id=file_id, dest_bucket_id=outbox_bucket_id, exc_text=str(exc)
)
log.critical(obj_error)
raise obj_error from exc

log.info(
"Object corresponding to file ID '%s' has been staged to the outbox.",
Expand Down
12 changes: 12 additions & 0 deletions services/ifrs/src/ifrs/ports/inbound/file_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ def __init__(self, file_id: str):
)
super().__init__(message)

class CopyOperationError(FatalError):
"""Thrown if an unresolvable error occurs while copying a file between buckets."""

def __init__(self, file_id: str, dest_bucket_id: str, exc_text: str):
message = (
f"Fatal error occurred while copying file with the ID '{file_id}'"
+ f" to the bucket '{dest_bucket_id}'. The exception is: {exc_text}"
)
super().__init__(message)

@abstractmethod
async def register_file(
self,
Expand Down Expand Up @@ -146,6 +156,8 @@ async def stage_registered_file(
self.FileInRegistryButNotInStorageError:
When encountering inconsistency between the registry (the database) and
the permanent storage. This a fatal error.
self.CopyOperationError:
When an error occurs while attempting to copy the object to the outbox.
"""
...

Expand Down
92 changes: 91 additions & 1 deletion services/ifrs/tests_ifrs/test_ifrs_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from hexkit.providers.s3.testutils import (
FileObject,
S3Fixture, # noqa: F401
temp_file_object,
tmp_file, # noqa: F401
)

Expand Down Expand Up @@ -276,7 +277,6 @@ async def test_storage_db_inconsistency(joint_fixture: JointFixture):
),
],
)
@pytest.mark.asyncio()
async def test_outbox_subscriber_routing(
joint_fixture: JointFixture,
upsertion_event: JsonObject,
Expand All @@ -297,3 +297,93 @@ async def test_outbox_subscriber_routing(

await joint_fixture.outbox_subscriber.run(forever=False)
mock.assert_awaited_once()


async def test_error_during_copy(joint_fixture: JointFixture, caplog):
"""Errors during `object_storage.copy_object` should be logged and re-raised."""
# Insert FileMetadata record into the DB
dao = joint_fixture.file_metadata_dao
await dao.insert(EXAMPLE_METADATA)

s3_alias = EXAMPLE_METADATA.storage_alias
source_bucket = joint_fixture.config.object_storages[s3_alias].bucket
outbox_bucket = "outbox-bucket"

# Upload a matching object to S3
with temp_file_object(source_bucket, EXAMPLE_METADATA.object_id) as file:
await joint_fixture.s3.populate_file_objects([file])

# Run the file-staging operation to encounter an error (outbox bucket doesn't exist)
caplog.clear()
caplog.set_level("CRITICAL")
with pytest.raises(joint_fixture.file_registry.CopyOperationError):
await joint_fixture.file_registry.stage_registered_file(
file_id=EXAMPLE_METADATA.file_id,
decrypted_sha256=EXAMPLE_METADATA.decrypted_sha256,
outbox_bucket_id=outbox_bucket,
outbox_object_id=EXAMPLE_METADATA.object_id,
)

# Verify the log message exists
assert caplog.records
assert caplog.records[0].message == (
"Fatal error occurred while copying file with the ID 'examplefile001' to the"
+ " bucket 'outbox-bucket'. The exception is: The bucket with ID 'outbox-bucket'"
+ " does not exist."
)

# Upload the file to the outbox bucket so we trigger ObjectAlreadyExistsError
with temp_file_object(outbox_bucket, EXAMPLE_METADATA.object_id) as file:
await joint_fixture.s3.populate_file_objects([file])

# Run the file-staging operation to encounter the error
caplog.clear()
caplog.set_level("INFO")
await joint_fixture.file_registry.stage_registered_file(
file_id=EXAMPLE_METADATA.file_id,
decrypted_sha256=EXAMPLE_METADATA.decrypted_sha256,
outbox_bucket_id=outbox_bucket,
outbox_object_id=EXAMPLE_METADATA.object_id,
)

assert caplog.records
assert caplog.records[0].getMessage() == (
"Object corresponding to file ID 'examplefile001' is already in the outbox."
)


async def test_copy_when_file_exists_in_outbox(joint_fixture: JointFixture, caplog):
"""Test that `FileRegistry.stage_registered_file` returns early if a copy is
unnecessary.
"""
# Insert FileMetadata record into the DB
dao = joint_fixture.file_metadata_dao
await dao.insert(EXAMPLE_METADATA)

# Populate the source and dest buckets
s3_alias = EXAMPLE_METADATA.storage_alias
source_bucket = joint_fixture.config.object_storages[s3_alias].bucket
outbox_bucket = "outbox-bucket"
with temp_file_object(source_bucket, EXAMPLE_METADATA.object_id) as file:
await joint_fixture.s3.populate_file_objects([file])

with temp_file_object(outbox_bucket, EXAMPLE_METADATA.object_id) as file:
await joint_fixture.s3.populate_file_objects([file])

# Run the file-staging operation, which should return early (it will catch the
# error raised by the hexkit provider, which asserts that the file doesn't exist
# in the outbox)
caplog.clear()
caplog.set_level("INFO")
await joint_fixture.file_registry.stage_registered_file(
file_id=EXAMPLE_METADATA.file_id,
decrypted_sha256=EXAMPLE_METADATA.decrypted_sha256,
outbox_bucket_id=outbox_bucket,
outbox_object_id=EXAMPLE_METADATA.object_id,
)

# Check the log
assert caplog.records
assert caplog.records[0].getMessage() == (
"Object corresponding to file ID 'examplefile001' is already in the outbox."
)

0 comments on commit 16ecde7

Please sign in to comment.