From 93ae3469de660f09c287d97b214ae26a39ddfddf Mon Sep 17 00:00:00 2001 From: Mamoru Miura Date: Fri, 22 Mar 2024 16:39:13 +0900 Subject: [PATCH 1/4] fix: adapt with feather and parquet in S3 --- gokart/file_processor.py | 9 +++++---- test/test_target.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index 67f8bbe5..1e08c3be 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -2,6 +2,7 @@ import pickle import xml.etree.ElementTree as ET from abc import abstractmethod +from io import BytesIO from logging import getLogger import luigi @@ -203,11 +204,11 @@ def __init__(self, engine='pyarrow', compression=None): super(ParquetFileProcessor, self).__init__() def format(self): - return None + return luigi.format.Nop def load(self, file): # MEMO: read_parquet only supports a filepath as string (not a file handle) - return pd.read_parquet(file.name) + return pd.read_parquet(BytesIO(file.read())) def dump(self, obj, file): assert isinstance(obj, (pd.DataFrame)), f'requires pd.DataFrame, but {type(obj)} is passed.' @@ -222,10 +223,10 @@ def __init__(self, store_index_in_feather: bool): self.INDEX_COLUMN_PREFIX = '__feather_gokart_index__' def format(self): - return None + return luigi.format.Nop def load(self, file): - loaded_df = pd.read_feather(file.name) + loaded_df = pd.read_feather(BytesIO(file.read())) if self._store_index_in_feather: if any(col.startswith(self.INDEX_COLUMN_PREFIX) for col in loaded_df.columns): diff --git a/test/test_target.py b/test/test_target.py index 8b1332c0..afa0f9de 100644 --- a/test/test_target.py +++ b/test/test_target.py @@ -208,6 +208,34 @@ def test_last_modified_time_without_file(self): with self.assertRaises(FileNotFoundError): target.last_modification_time() + @mock_s3 + def test_save_on_s3_feather(self): + conn = boto3.resource('s3', region_name='us-east-1') + conn.create_bucket(Bucket='test') + + obj = pd.DataFrame(dict(a=[1, 2], b=[3, 4])) + file_path = os.path.join('s3://test/', 'test.feather') + + target = make_target(file_path=file_path, unique_id=None) + target.dump(obj) + loaded = target.load() + + pd.testing.assert_frame_equal(loaded, obj) + + @mock_s3 + def test_save_on_s3_parquet(self): + conn = boto3.resource('s3', region_name='us-east-1') + conn.create_bucket(Bucket='test') + + obj = pd.DataFrame(dict(a=[1, 2], b=[3, 4])) + file_path = os.path.join('s3://test/', 'test.parquet') + + target = make_target(file_path=file_path, unique_id=None) + target.dump(obj) + loaded = target.load() + + pd.testing.assert_frame_equal(loaded, obj) + class ModelTargetTest(unittest.TestCase): def tearDown(self): From 7abbc331d5f5a0f04f1e688e2dcc00ee076a0bb6 Mon Sep 17 00:00:00 2001 From: Mamoru Miura Date: Wed, 3 Apr 2024 15:23:55 +0900 Subject: [PATCH 2/4] docs: add fixme comment --- gokart/file_processor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index 1e08c3be..e8ca7178 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -207,7 +207,10 @@ def format(self): return luigi.format.Nop def load(self, file): - # MEMO: read_parquet only supports a filepath as string (not a file handle) + # FIXME(mamo3gr): enable streaming (chunked) read. + # pandas.read_parquet accepts file-like object + # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, + # which is needed for pandas to read a file in chunks. return pd.read_parquet(BytesIO(file.read())) def dump(self, obj, file): @@ -226,6 +229,10 @@ def format(self): return luigi.format.Nop def load(self, file): + # FIXME(mamo3gr): enable streaming (chunked) read. + # pandas.read_feather accepts file-like object + # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, + # which is needed for pandas to read a file in chunks. loaded_df = pd.read_feather(BytesIO(file.read())) if self._store_index_in_feather: From 65bb5b0cd73e8250c6969d2f8a485368ccead2b5 Mon Sep 17 00:00:00 2001 From: Mamoru Miura Date: Wed, 3 Apr 2024 16:36:02 +0900 Subject: [PATCH 3/4] feat: read the whole file only if it is not buffered --- gokart/file_processor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index e8ca7178..675be799 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -207,11 +207,14 @@ def format(self): return luigi.format.Nop def load(self, file): - # FIXME(mamo3gr): enable streaming (chunked) read. + # FIXME(mamo3gr): enable streaming (chunked) read with S3. # pandas.read_parquet accepts file-like object # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, # which is needed for pandas to read a file in chunks. - return pd.read_parquet(BytesIO(file.read())) + if ObjectStorage.is_buffered_reader(file): + return pd.read_parquet(_ChunkedLargeFileReader(file)) + else: + return pd.read_parquet(BytesIO(file.read())) def dump(self, obj, file): assert isinstance(obj, (pd.DataFrame)), f'requires pd.DataFrame, but {type(obj)} is passed.' @@ -229,11 +232,14 @@ def format(self): return luigi.format.Nop def load(self, file): - # FIXME(mamo3gr): enable streaming (chunked) read. + # FIXME(mamo3gr): enable streaming (chunked) read with S3. # pandas.read_feather accepts file-like object # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, # which is needed for pandas to read a file in chunks. - loaded_df = pd.read_feather(BytesIO(file.read())) + if ObjectStorage.is_buffered_reader(file): + loaded_df = pd.read_feather(_ChunkedLargeFileReader(file)) + else: + loaded_df = pd.read_feather(BytesIO(file.read())) if self._store_index_in_feather: if any(col.startswith(self.INDEX_COLUMN_PREFIX) for col in loaded_df.columns): From 0a6880f1b9e6239bf92748235cc5ee62e05543cc Mon Sep 17 00:00:00 2001 From: Mamoru Miura Date: Wed, 3 Apr 2024 17:41:28 +0900 Subject: [PATCH 4/4] fix: revert the way for loading feather/parquet when the target is not on S3 --- gokart/file_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gokart/file_processor.py b/gokart/file_processor.py index 675be799..56510b7f 100644 --- a/gokart/file_processor.py +++ b/gokart/file_processor.py @@ -212,7 +212,7 @@ def load(self, file): # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, # which is needed for pandas to read a file in chunks. if ObjectStorage.is_buffered_reader(file): - return pd.read_parquet(_ChunkedLargeFileReader(file)) + return pd.read_parquet(file.name) else: return pd.read_parquet(BytesIO(file.read())) @@ -237,7 +237,7 @@ def load(self, file): # but file (luigi.contrib.s3.ReadableS3File) should have 'tell' method, # which is needed for pandas to read a file in chunks. if ObjectStorage.is_buffered_reader(file): - loaded_df = pd.read_feather(_ChunkedLargeFileReader(file)) + loaded_df = pd.read_feather(file.name) else: loaded_df = pd.read_feather(BytesIO(file.read()))