Skip to content

Commit

Permalink
Merge pull request Avaiga#1474 from Avaiga/feature/Avaiga#1196-datano…
Browse files Browse the repository at this point in the history
…de-download-upload-api

Feature/Avaiga#1196 - Add download and upload api for file-based datanodes
  • Loading branch information
trgiangdo authored Jul 12, 2024
2 parents 0782ef0 + 552c1e2 commit 380b29d
Show file tree
Hide file tree
Showing 15 changed files with 835 additions and 85 deletions.
59 changes: 58 additions & 1 deletion taipy/core/data/_file_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import shutil
from datetime import datetime
from os.path import isfile
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional

from taipy.config.config import Config
from taipy.logger._taipy_logger import _TaipyLogger

from .._entity._reload import _self_reload
from ..reason import InvalidUploadFile, ReasonCollection, UploadFileCanNotBeRead
from .data_node import DataNode
from .data_node_id import Edit

Expand All @@ -34,6 +36,8 @@ class _FileDataNodeMixin(object):
_DEFAULT_PATH_KEY = "default_path"
_IS_GENERATED_KEY = "is_generated"

__logger = _TaipyLogger._get_logger()

def __init__(self, properties: Dict) -> None:
self._path: str = properties.get(self._PATH_KEY, properties.get(self._DEFAULT_PATH_KEY))
self._is_generated: bool = properties.get(self._IS_GENERATED_KEY, self._path is None)
Expand Down Expand Up @@ -92,3 +96,56 @@ def _migrate_path(self, storage_type, old_path) -> str:
if os.path.exists(old_path):
shutil.move(old_path, new_path)
return new_path

def _get_downloadable_path(self) -> str:
"""Get the downloadable path of the file data of the data node.
Returns:
The downloadable path of the file data of the data node if it exists, otherwise an empty string.
"""
if os.path.exists(self.path) and isfile(self._path):
return self.path

return ""

def _upload(self, path: str, upload_checker: Optional[Callable[[str, Any], bool]] = None) -> ReasonCollection:
"""Upload a file data to the data node.
Parameters:
path (str): The path of the file to upload to the data node.
upload_checker (Optional[Callable[[str, Any], bool]]): A function to check if the upload is allowed.
The function takes the title of the upload data and the data itself as arguments and returns
True if the upload is allowed, otherwise False.
Returns:
True if the upload was successful, otherwise False.
"""
from ._data_manager_factory import _DataManagerFactory

reason_collection = ReasonCollection()

upload_path = pathlib.Path(path)

try:
upload_data = self._read_from_path(str(upload_path))
except Exception as err:
self.__logger.error(f"Error while uploading {upload_path.name} to data node {self.id}:") # type: ignore[attr-defined]
self.__logger.error(f"Error: {err}")
reason_collection._add_reason(self.id, UploadFileCanNotBeRead(upload_path.name, self.id)) # type: ignore[attr-defined]
return reason_collection

if upload_checker is not None:
if not upload_checker(upload_path.name, upload_data):
reason_collection._add_reason(self.id, InvalidUploadFile(upload_path.name, self.id)) # type: ignore[attr-defined]
return reason_collection

shutil.copy(upload_path, self.path)

self.track_edit(timestamp=datetime.now()) # type: ignore[attr-defined]
self.unlock_edit() # type: ignore[attr-defined]
_DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]

return reason_collection

def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
raise NotImplementedError
11 changes: 6 additions & 5 deletions taipy/core/data/_tabular_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ class _TabularDataNodeMixin(object):
_VALID_STRING_EXPOSED_TYPES = [_EXPOSED_TYPE_PANDAS, _EXPOSED_TYPE_NUMPY]

def __init__(self, **kwargs) -> None:
self._decoder: Union[Callable[[List[Any]], Any], Callable[[Dict[Any, Any]], Any]]
self._decoder: Union[Callable, Any]
self.custom_document = kwargs.get(self._EXPOSED_TYPE_PROPERTY)
if kwargs.get(self._HAS_HEADER_PROPERTY, True):
self._decoder = self._default_decoder_with_header
else:
self._decoder = self._default_decoder_without_header

custom_decoder = getattr(self.custom_document, "decode", None)
if callable(custom_decoder):
self._decoder = custom_decoder
elif kwargs.get(self._HAS_HEADER_PROPERTY, True):
self._decoder = self._default_decoder_with_header
else:
self._decoder = self._default_decoder_without_header

self._encoder = self._default_encoder
custom_encoder = getattr(self.custom_document, "encode", None)
Expand Down
43 changes: 25 additions & 18 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,41 +137,48 @@ def storage_type(cls) -> str:
return cls.__STORAGE_TYPE

def _read(self):
return self._read_from_path()

def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
if path is None:
path = self._path

properties = self.properties
if properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe()
return self._read_as_pandas_dataframe(path=path)
if properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy()
return self._read_as()
return self._read_as_numpy(path=path)
return self._read_as(path=path)

def _read_as(self):
def _read_as(self, path: str):
properties = self.properties
with open(self._path, encoding=properties[self.__ENCODING_KEY]) as csvFile:
with open(path, encoding=properties[self.__ENCODING_KEY]) as csvFile:
if properties[self._HAS_HEADER_PROPERTY]:
reader = csv.DictReader(csvFile)
else:
reader = csv.reader(csvFile)
reader_with_header = csv.DictReader(csvFile)
return [self._decoder(line) for line in reader_with_header]

return [self._decoder(line) for line in reader]
reader_without_header = csv.reader(csvFile)
return [self._decoder(line) for line in reader_without_header]

def _read_as_numpy(self) -> np.ndarray:
return self._read_as_pandas_dataframe().to_numpy()
def _read_as_numpy(self, path: str) -> np.ndarray:
return self._read_as_pandas_dataframe(path=path).to_numpy()

def _read_as_pandas_dataframe(
self, usecols: Optional[List[int]] = None, column_names: Optional[List[str]] = None
self,
path: str,
usecols: Optional[List[int]] = None,
column_names: Optional[List[str]] = None,
) -> pd.DataFrame:
try:
properties = self.properties
if properties[self._HAS_HEADER_PROPERTY]:
if column_names:
return pd.read_csv(self._path, encoding=properties[self.__ENCODING_KEY])[column_names]
return pd.read_csv(self._path, encoding=properties[self.__ENCODING_KEY])
return pd.read_csv(path, encoding=properties[self.__ENCODING_KEY])[column_names]
return pd.read_csv(path, encoding=properties[self.__ENCODING_KEY])
else:
if usecols:
return pd.read_csv(
self._path, encoding=properties[self.__ENCODING_KEY], header=None, usecols=usecols
)
return pd.read_csv(self._path, encoding=properties[self.__ENCODING_KEY], header=None)
return pd.read_csv(path, encoding=properties[self.__ENCODING_KEY], header=None, usecols=usecols)
return pd.read_csv(path, encoding=properties[self.__ENCODING_KEY], header=None)
except pd.errors.EmptyDataError:
return pd.DataFrame()

Expand Down
46 changes: 28 additions & 18 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# specific language governing permissions and limitations under the License.

from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -150,39 +150,45 @@ def _check_exposed_type(exposed_type):
_TabularDataNodeMixin._check_exposed_type(t)

def _read(self):
return self._read_from_path()

def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
if path is None:
path = self._path

exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
if exposed_type == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe()
return self._read_as_pandas_dataframe(path=path)
if exposed_type == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy()
return self._read_as()
return self._read_as_numpy(path=path)
return self._read_as(path=path)

def _read_sheet_with_exposed_type(
self, sheet_exposed_type: str, sheet_name: str
self, path: str, sheet_exposed_type: str, sheet_name: str
) -> Optional[Union[np.ndarray, pd.DataFrame]]:
if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
return self._read_as_pandas_dataframe(sheet_name).to_numpy() # type: ignore
return self._read_as_numpy(path, sheet_name)
elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe(sheet_name)
return self._read_as_pandas_dataframe(path, sheet_name)
return None

def _read_as(self):
def _read_as(self, path: str):
try:
properties = self.properties
excel_file = load_workbook(self._path)
excel_file = load_workbook(path)
exposed_type = properties[self._EXPOSED_TYPE_PROPERTY]
work_books = {}
sheet_names = excel_file.sheetnames

user_provided_sheet_names = properties.get(self.__SHEET_NAME_PROPERTY) or []
if not isinstance(user_provided_sheet_names, (List, Set, Tuple)):
if not isinstance(user_provided_sheet_names, (list, set, tuple)):
user_provided_sheet_names = [user_provided_sheet_names]

provided_sheet_names = user_provided_sheet_names or sheet_names

for sheet_name in provided_sheet_names:
if sheet_name not in sheet_names:
raise NonExistingExcelSheet(sheet_name, self._path)
raise NonExistingExcelSheet(sheet_name, path)

if isinstance(exposed_type, List):
if len(provided_sheet_names) != len(exposed_type):
Expand All @@ -201,7 +207,7 @@ def _read_as(self):
sheet_exposed_type = exposed_type[i]

if isinstance(sheet_exposed_type, str):
sheet_data = self._read_sheet_with_exposed_type(sheet_exposed_type, sheet_name)
sheet_data = self._read_sheet_with_exposed_type(path, sheet_exposed_type, sheet_name)
if sheet_data is not None:
work_books[sheet_name] = sheet_data
continue
Expand All @@ -223,14 +229,16 @@ def _read_as(self):

return work_books

def _read_as_numpy(self):
sheets = self._read_as_pandas_dataframe()
def _read_as_numpy(self, path: str, sheet_names=None):
sheets = self._read_as_pandas_dataframe(path=path, sheet_names=sheet_names)
if isinstance(sheets, dict):
return {sheet_name: df.to_numpy() for sheet_name, df in sheets.items()}
return sheets.to_numpy()

def _do_read_excel(self, sheet_names, kwargs) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
return pd.read_excel(self._path, sheet_name=sheet_names, **kwargs)
def _do_read_excel(
self, path: str, sheet_names, kwargs
) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
return pd.read_excel(path, sheet_name=sheet_names, **kwargs)

def __get_sheet_names_and_header(self, sheet_names):
kwargs = {}
Expand All @@ -241,10 +249,12 @@ def __get_sheet_names_and_header(self, sheet_names):
kwargs["header"] = None
return sheet_names, kwargs

def _read_as_pandas_dataframe(self, sheet_names=None) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
def _read_as_pandas_dataframe(
self, path: str, sheet_names=None
) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
sheet_names, kwargs = self.__get_sheet_names_and_header(sheet_names)
try:
return self._do_read_excel(sheet_names, kwargs)
return self._do_read_excel(path, sheet_names, kwargs)
except pd.errors.EmptyDataError:
return pd.DataFrame()

Expand Down
8 changes: 7 additions & 1 deletion taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ def decoder(self, decoder: json.JSONDecoder):
self.properties[self._DECODER_KEY] = decoder

def _read(self):
with open(self._path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
return self._read_from_path()

def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
if path is None:
path = self._path

with open(path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
return json.load(f, cls=self._decoder)

def _append(self, data: Any):
Expand Down
67 changes: 34 additions & 33 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datetime import datetime, timedelta
from os.path import isdir, isfile
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -181,18 +181,43 @@ def storage_type(cls) -> str:
return cls.__STORAGE_TYPE

def _read(self):
return self.read_with_kwargs()
return self._read_from_path()

def _read_as(self, read_kwargs: Dict):
def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
if path is None:
path = self._path

# return None if data was never written
if not self.last_edit_date:
self._DataNode__logger.warning(
f"Data node {self.id} from config {self.config_id} is being read but has never been written."
)
return None

kwargs = self.properties[self.__READ_KWARGS_PROPERTY]
kwargs.update(
{
self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
}
)
kwargs.update(read_kwargs)

if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe(path, kwargs)
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy(path, kwargs)
return self._read_as(path, kwargs)

def _read_as(self, path: str, read_kwargs: Dict):
custom_class = self.properties[self._EXPOSED_TYPE_PROPERTY]
list_of_dicts = self._read_as_pandas_dataframe(read_kwargs).to_dict(orient="records")
list_of_dicts = self._read_as_pandas_dataframe(path, read_kwargs).to_dict(orient="records")
return [custom_class(**dct) for dct in list_of_dicts]

def _read_as_numpy(self, read_kwargs: Dict) -> np.ndarray:
return self._read_as_pandas_dataframe(read_kwargs).to_numpy()
def _read_as_numpy(self, path: str, read_kwargs: Dict) -> np.ndarray:
return self._read_as_pandas_dataframe(path, read_kwargs).to_numpy()

def _read_as_pandas_dataframe(self, read_kwargs: Dict) -> pd.DataFrame:
return pd.read_parquet(self._path, **read_kwargs)
def _read_as_pandas_dataframe(self, path: str, read_kwargs: Dict) -> pd.DataFrame:
return pd.read_parquet(path, **read_kwargs)

def _append(self, data: Any):
self.write_with_kwargs(data, engine="fastparquet", append=True)
Expand Down Expand Up @@ -237,28 +262,4 @@ def read_with_kwargs(self, **read_kwargs):
**read_kwargs (dict[str, any]): The keyword arguments passed to the function
`pandas.read_parquet()`.
"""
# return None if data was never written
if not self.last_edit_date:
self._DataNode__logger.warning(
f"Data node {self.id} from config {self.config_id} is being read but has never been written."
)
return None

properties = self.properties
exposed_type = properties[self._EXPOSED_TYPE_PROPERTY]
kwargs = properties[self.__READ_KWARGS_PROPERTY]
kwargs.update(
{
self.__ENGINE_PROPERTY: properties[self.__ENGINE_PROPERTY],
}
)
kwargs.update(read_kwargs)

return self._do_read_with_kwargs(exposed_type, kwargs)

def _do_read_with_kwargs(self, exposed_type, read_kwargs) -> Union[pd.DataFrame, np.ndarray, List]:
if exposed_type == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe(read_kwargs)
if exposed_type == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy(read_kwargs)
return self._read_as(read_kwargs)
return self._read_from_path(**read_kwargs)
Loading

0 comments on commit 380b29d

Please sign in to comment.