-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make FlyteFile and FlyteDirectory pickleable #3030
Conversation
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Code Review Agent Run #639caaActionable Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
# test round trip pickling | ||
pickled_input = pickle.dumps(downstream_input) | ||
unpickled_input = pickle.loads(pickled_input) | ||
assert downstream_input == unpickled_input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding more assertions to verify the pickled/unpickled FlyteDirectory
object's properties like uri
and other attributes are preserved correctly after deserialization.
Code suggestion
Check the AI-generated fix before applying
# test round trip pickling | |
pickled_input = pickle.dumps(downstream_input) | |
unpickled_input = pickle.loads(pickled_input) | |
assert downstream_input == unpickled_input | |
# test round trip pickling | |
pickled_input = pickle.dumps(downstream_input) | |
unpickled_input = pickle.loads(pickled_input) | |
assert downstream_input == unpickled_input | |
assert unpickled_input.uri == "s3://sample-path/directory" |
Code Review Run #639caa
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/types/directory/types.py
Outdated
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | ||
return file_access_provider.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the case where file_access_provider.get_data()
fails. The function should handle potential exceptions that may occur during data retrieval. A similar issue was also found in flytekit/types/file/file.py (line 717-718).
Code suggestion
Check the AI-generated fix before applying
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | |
return file_access_provider.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) | |
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | |
try: | |
return file_access_provider.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) | |
except Exception as e: | |
raise TypeTransformerFailedError( | |
f"Failed to download directory from {uri} to {local_folder}: {str(e)}" | |
) |
Code Review Run #639caa
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/types/directory/types.py
Outdated
@@ -683,4 +684,8 @@ def guess_python_type(self, literal_type: LiteralType) -> typing.Type[FlyteDirec | |||
raise ValueError(f"Transformer {self} cannot reverse {literal_type}") | |||
|
|||
|
|||
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the return value of _flyte_directory_downloader
. The function appears to return a result from get_data()
but its return type is not specified.
Code suggestion
Check the AI-generated fix before applying
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | |
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int) -> Any: |
Code Review Run #639caa
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3030 +/- ##
==========================================
+ Coverage 51.28% 51.81% +0.52%
==========================================
Files 201 201
Lines 21281 21289 +8
Branches 2734 2734
==========================================
+ Hits 10915 11031 +116
+ Misses 9767 9739 -28
+ Partials 599 519 -80 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #f65ef4Actionable Suggestions - 1
Review Details
|
flytekit/types/file/file.py
Outdated
ff = FlyteFile.__class_getitem__(expected_format)( | ||
path=local_path, downloader=lambda: self.downloader(ctx=ctx, remote_path=uri, local_path=local_path) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more descriptive lambda function name instead of an anonymous lambda. A named function would better document the purpose of the downloader callback.
Code suggestion
Check the AI-generated fix before applying
ff = FlyteFile.__class_getitem__(expected_format)( | |
path=local_path, downloader=lambda: self.downloader(ctx=ctx, remote_path=uri, local_path=local_path) | |
) | |
download_callback = partial(self.downloader, ctx=ctx, remote_path=uri, local_path=local_path) | |
ff = FlyteFile.__class_getitem__(expected_format)( | |
path=local_path, downloader=download_callback | |
) |
Code Review Run #f65ef4
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/types/directory/types.py
Outdated
@@ -665,8 +667,7 @@ async def async_to_python_value( | |||
|
|||
batch_size = get_batch_size(expected_python_type) | |||
|
|||
def _downloader(): | |||
return ctx.file_access.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) | |||
_downloader = partial(_flyte_directory_downloader, ctx.file_access, uri, local_folder, batch_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again, I think we can partial ctx.file_access
itself and not need to define _flyte_directory_downloader
.
_downloader = partial(ctx.file_access.get_data, url, local_folder, is_multipart=True, batch_size=batch_size)
flytekit/types/file/file.py
Outdated
@@ -732,12 +734,14 @@ async def async_to_python_value( | |||
|
|||
# For the remote case, return an FlyteFile object that can download | |||
local_path = ctx.file_access.get_random_local_path(uri) | |||
|
|||
_downloader = partial(_flyte_file_downloader, ctx.file_access, uri, local_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here regarding partialing ctx.file_access.get_data
directly.
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
6d17e5c
to
dddde88
Compare
Code Review Agent Run #be429eActionable Suggestions - 2
Review Details
|
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
flytekit/types/directory/types.py
Outdated
|
||
TypeEngine.register(FlyteDirToMultipartBlobTransformer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the case where file_access_provider.get_data()
fails. The function should handle potential exceptions and provide appropriate error messages.
Code suggestion
Check the AI-generated fix before applying
TypeEngine.register(FlyteDirToMultipartBlobTransformer()) | |
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | |
try: | |
return file_access_provider.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) | |
except Exception as e: | |
raise TypeTransformerFailedError( | |
f"Failed to download directory from {uri} to {local_folder}: {str(e)}" | |
) |
Code Review Run #be429e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/types/directory/types.py
Outdated
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int): | ||
return file_access_provider.get_data(uri, local_folder, is_multipart=True, batch_size=batch_size) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the return value of _flyte_directory_downloader
function to improve code clarity and maintainability.
Code suggestion
Check the AI-generated fix before applying
def _flyte_directory_downloader(file_access_provider: FileAccessProvider, uri: str, local_folder: str, batch_size: int) -> Any: |
Code Review Run #be429e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Code Review Agent Run #5c2427Actionable Suggestions - 0Review Details
|
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Code Review Agent Run #d95336Actionable Suggestions - 1
Review Details
|
flytekit/types/file/file.py
Outdated
expected_format = FlyteFilePathTransformer.get_format(expected_python_type) | ||
ff = FlyteFile.__class_getitem__(expected_format)( | ||
path=local_path, downloader=lambda: self.downloader(ctx=ctx, remote_path=uri, local_path=local_path) | ||
path=local_path, | ||
downloader=partial(self.downloader, ctx.file_access, remote_path=uri, local_path=local_path), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider updating the downloader
partial function call to include type hints for better code maintainability and IDE support. The ctx.file_access
parameter type could be explicitly specified as FileAccessProvider
.
Code suggestion
Check the AI-generated fix before applying
downloader=partial(self.downloader, ctx.file_access, remote_path=uri, local_path=local_path), | |
downloader=partial(self.downloader, typing.cast(FileAccessProvider, ctx.file_access), remote_path=uri, local_path=local_path), |
Code Review Run #d95336
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/types/file/file.py
Outdated
return ff | ||
|
||
@staticmethod | ||
def downloader( | ||
ctx: FlyteContext, remote_path: typing.Union[str, os.PathLike], local_path: typing.Union[str, os.PathLike] | ||
file_access_provider: FileAccessProvider, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks backward compatibility. Is it safe to change the signature here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell this staticmethod was added in a PR 4 days ago: https://github.com/flyteorg/flytekit/pull/2991/files
And the only call site was in this module itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rather not have this be part of the public API. Can this be renamed to _remote_downloader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also revert this to the older implementation with the pure partial
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works for me.
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Code Review Agent Run #0644dbActionable Suggestions - 0Review Details
|
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
* make _downloader function in FlyteFile/Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * make FlyteFile and Directory pickleable Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unnecessary helper functions Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * use partials instead of lambda Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove unneeded helper function Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * update FlyteFilePathTransformer.downloader method Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * remove downloader staticmethod Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> * fix lint Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com> Signed-off-by: Shuying Liang <shuying.liang@gmail.com>
Tracking issue
flyteorg/flyte#6144
Why are the changes needed?
Because doing distributed training with the
Elastic
plugin fails when trying to run a task with FlyteDirectory or FlyteFile inputs.What changes were proposed in this pull request?
_downloader
as a module-level function that's made into a partial when converting FlyteFile/Directory literals into python values.How was this patch tested?
Unit tests were updated to test for round-trip pickling.
Check all the applicable boxes
[ ] I updated the documentation accordingly.Related PRs
Docs link
Summary by Bito
Implementation of pickleable downloaders for FlyteFile and FlyteDirectory by replacing lambda functions with functools.partial. This critical fix enables distributed training with the Elastic plugin by ensuring proper serialization of file handling objects. Changes include restructuring downloader implementation, moving FileAccessProvider import, and streamlining file access through context object.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2