From 16bbeb6413b7b0b662e5df9b90474c5db93e345e Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Fri, 24 Mar 2023 15:09:42 +0000 Subject: [PATCH] Update SparkDataSet's doc and add support for `fsspec` (#128) * update notes Signed-off-by: Nok Chan * Simplify logic Signed-off-by: Nok Chan * Revert "Simplify logic" This reverts commit 734902ac3fce534e47d0b94df57b4c31efb8b96e. Signed-off-by: Nok Chan * Refactoring Signed-off-by: Nok Chan * more refactoring Signed-off-by: Nok Chan * refactor Signed-off-by: Nok Chan * update datasets Signed-off-by: Nok Chan * fix prefix Signed-off-by: Nok Chan * Fix bug and tests Signed-off-by: Nok Chan * update release notes Signed-off-by: Nok Chan * Apply comments --------- Signed-off-by: Nok Chan --- kedro-datasets/RELEASE.md | 1 + .../kedro_datasets/spark/spark_dataset.py | 64 ++++++++----------- .../tests/spark/test_spark_dataset.py | 8 --- 3 files changed, 27 insertions(+), 46 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index baa83bd85..42cb04465 100644 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -20,6 +20,7 @@ | ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- | | `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` | | `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` | +* Use `fsspec` in `SparkDataSet` to support more filesystems. ## Bug fixes and other changes * Add `mssql` backend to the `SQLQueryDataSet` DataSet using `pyodbc` library. diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index d366eae08..aad2a5a90 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -123,15 +123,6 @@ def _deployed_on_databricks() -> bool: return "DATABRICKS_RUNTIME_VERSION" in os.environ -def _path_has_dbfs_prefix(path: str) -> bool: - """Check if a file path has a valid dbfs prefix. - - Args: - path: File path to check. - """ - return path.startswith("/dbfs/") - - class KedroHdfsInsecureClient(InsecureClient): """Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists`` and ``hdfs_glob`` methods required by ``SparkDataSet``""" @@ -287,27 +278,29 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) + path = PurePosixPath(filepath) exists_function = None glob_function = None - if fs_prefix in ("s3a://", "s3n://"): - if fs_prefix == "s3n://": - warn( - "'s3n' filesystem has now been deprecated by Spark, " - "please consider switching to 's3a'", - DeprecationWarning, - ) + if not filepath.startswith("/dbfs/") and _deployed_on_databricks(): + logger.warning( + "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " + "filepath is a known source of error. You must add this prefix to %s", + filepath, + ) + if fs_prefix and fs_prefix in ("s3a://"): _s3 = S3FileSystem(**credentials) exists_function = _s3.exists + # Ensure cache is not used so latest version is retrieved correctly. glob_function = partial(_s3.glob, refresh=True) - path = PurePosixPath(filepath) - elif fs_prefix == "hdfs://" and version: - warn( - f"HDFS filesystem support for versioned {self.__class__.__name__} is " - f"in beta and uses 'hdfs.client.InsecureClient', please use with " - f"caution" - ) + elif fs_prefix == "hdfs://": + if version: + warn( + f"HDFS filesystem support for versioned {self.__class__.__name__} is " + f"in beta and uses 'hdfs.client.InsecureClient', please use with " + f"caution" + ) # default namenode address credentials.setdefault("url", "http://localhost:9870") @@ -316,21 +309,18 @@ def __init__( # pylint: disable=too-many-arguments _hdfs_client = KedroHdfsInsecureClient(**credentials) exists_function = _hdfs_client.hdfs_exists glob_function = _hdfs_client.hdfs_glob # type: ignore - path = PurePosixPath(filepath) + elif filepath.startswith("/dbfs/"): + # dbfs add prefix to Spark path by default + # See https://github.com/kedro-org/kedro-plugins/issues/117 + dbutils = _get_dbutils(self._get_spark()) + if dbutils: + glob_function = partial(_dbfs_glob, dbutils=dbutils) + exists_function = partial(_dbfs_exists, dbutils=dbutils) else: - path = PurePosixPath(filepath) - if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): - logger.warning( - "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - "filepath is a known source of error. You must add this prefix to %s", - filepath, - ) - if filepath.startswith("/dbfs"): - dbutils = _get_dbutils(self._get_spark()) - if dbutils: - glob_function = partial(_dbfs_glob, dbutils=dbutils) - exists_function = partial(_dbfs_exists, dbutils=dbutils) + fs = fsspec.filesystem(fs_prefix.strip("://"), **credentials) + exists_function = fs.exists + glob_function = fs.glob super().__init__( filepath=path, @@ -359,7 +349,6 @@ def __init__( # pylint: disable=too-many-arguments @staticmethod def _load_schema_from_file(schema: Dict[str, Any]) -> StructType: - filepath = schema.get("filepath") if not filepath: raise DataSetError( @@ -375,7 +364,6 @@ def _load_schema_from_file(schema: Dict[str, Any]) -> StructType: # Open schema file with file_system.open(load_path) as fs_file: - try: return StructType.fromJson(json.loads(fs_file.read())) except Exception as exc: diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 74c5ee2bf..9452b007d 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -800,14 +800,6 @@ def test_prevent_overwrite(self, mocker, versioned_dataset_s3): mocked_spark_df.write.save.assert_not_called() - def test_s3n_warning(self, version): - pattern = ( - "'s3n' filesystem has now been deprecated by Spark, " - "please consider switching to 's3a'" - ) - with pytest.warns(DeprecationWarning, match=pattern): - SparkDataSet(filepath=f"s3n://{BUCKET_NAME}/{FILENAME}", version=version) - def test_repr(self, versioned_dataset_s3, version): assert "filepath=s3a://" in str(versioned_dataset_s3) assert f"version=Version(load=None, save='{version.save}')" in str(