diff --git a/ci/build_wheel_cudf.sh b/ci/build_wheel_cudf.sh index fef4416a366..ae4eb0d5c66 100755 --- a/ci/build_wheel_cudf.sh +++ b/ci/build_wheel_cudf.sh @@ -23,6 +23,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt" python -m auditwheel repair \ --exclude libcudf.so \ --exclude libnvcomp.so \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/ci/build_wheel_libcudf.sh b/ci/build_wheel_libcudf.sh index b3d6778ea04..c010eb40ded 100755 --- a/ci/build_wheel_libcudf.sh +++ b/ci/build_wheel_libcudf.sh @@ -15,6 +15,13 @@ rapids-dependency-file-generator \ --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION};cuda_suffixed=true" \ | tee /tmp/requirements-build.txt + +# Download wheel from +LIBKVIKIO_CHANNEL=$( + RAPIDS_PY_WHEEL_NAME=libkvikio_cu12 rapids-get-pr-wheel-artifact kvikio 527 cpp +) +echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + rapids-logger "Installing build requirements" python -m pip install \ -v \ @@ -25,7 +32,7 @@ python -m pip install \ # 0 really means "add --no-build-isolation" (ref: https://github.com/pypa/pip/issues/5735) export PIP_NO_BUILD_ISOLATION=0 -export SKBUILD_CMAKE_ARGS="-DUSE_NVCOMP_RUNTIME_WHEEL=ON" +export SKBUILD_CMAKE_ARGS="-DUSE_NVCOMP_RUNTIME_WHEEL=ON;-DUSE_LIBKVIKIO_RUNTIME_WHEEL=ON" ./ci/build_wheel.sh "${package_name}" "${package_dir}" RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})" @@ -33,6 +40,7 @@ RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})" mkdir -p ${package_dir}/final_dist python -m auditwheel repair \ --exclude libnvcomp.so.4 \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/ci/build_wheel_pylibcudf.sh b/ci/build_wheel_pylibcudf.sh index 839d98846fe..c4a89f20f5f 100755 --- a/ci/build_wheel_pylibcudf.sh +++ b/ci/build_wheel_pylibcudf.sh @@ -21,6 +21,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt" python -m auditwheel repair \ --exclude libcudf.so \ --exclude libnvcomp.so \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/ci/cudf_pandas_scripts/pandas-tests/run.sh b/ci/cudf_pandas_scripts/pandas-tests/run.sh index e5cd4436a3a..639a873193e 100755 --- a/ci/cudf_pandas_scripts/pandas-tests/run.sh +++ b/ci/cudf_pandas_scripts/pandas-tests/run.sh @@ -17,8 +17,16 @@ RAPIDS_PY_WHEEL_NAME="cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from RAPIDS_PY_WHEEL_NAME="libcudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 cpp ./dist RAPIDS_PY_WHEEL_NAME="pylibcudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 python ./dist +# Download wheel from +LIBKVIKIO_CHANNEL=$( +RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python? +) +echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + # echo to expand wildcard before adding `[extra]` requires for pip python -m pip install \ + -v \ + -r /tmp/requirements-build.txt \ "$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test,pandas-tests]" \ "$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \ "$(echo ./dist/pylibcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" diff --git a/ci/cudf_pandas_scripts/run_tests.sh b/ci/cudf_pandas_scripts/run_tests.sh index 61361fffb07..37089815410 100755 --- a/ci/cudf_pandas_scripts/run_tests.sh +++ b/ci/cudf_pandas_scripts/run_tests.sh @@ -57,8 +57,15 @@ else # generate constraints (possibly pinning to oldest support versions of dependencies) rapids-generate-pip-constraints test_python_cudf_pandas ./constraints.txt + # Download wheel from + LIBKVIKIO_CHANNEL=$( + RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python? + ) + echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + python -m pip install \ -v \ + -r /tmp/requirements-build.txt \ --constraint ./constraints.txt \ "$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test,cudf-pandas-tests]" \ "$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \ diff --git a/ci/test_wheel_cudf.sh b/ci/test_wheel_cudf.sh index ce12744c9e3..ac5acf9089d 100755 --- a/ci/test_wheel_cudf.sh +++ b/ci/test_wheel_cudf.sh @@ -15,9 +15,16 @@ rapids-logger "Install cudf, pylibcudf, and test requirements" # generate constraints (possibly pinning to oldest support versions of dependencies) rapids-generate-pip-constraints py_test_cudf ./constraints.txt +# Download wheel from +LIBKVIKIO_CHANNEL=$( + RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python? +) +echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + # echo to expand wildcard before adding `[extra]` requires for pip python -m pip install \ -v \ + -r /tmp/requirements-build.txt \ --constraint ./constraints.txt \ "$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \ "$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \ diff --git a/ci/test_wheel_cudf_polars.sh b/ci/test_wheel_cudf_polars.sh index 2884757e46b..340c29eae4c 100755 --- a/ci/test_wheel_cudf_polars.sh +++ b/ci/test_wheel_cudf_polars.sh @@ -33,9 +33,16 @@ rapids-logger "Installing cudf_polars and its dependencies" # generate constraints (possibly pinning to oldest support versions of dependencies) rapids-generate-pip-constraints py_test_cudf_polars ./constraints.txt +# Download wheel from +LIBKVIKIO_CHANNEL=$( + RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python? +) +echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + # echo to expand wildcard before adding `[test]` requires for pip python -m pip install \ -v \ + -r /tmp/requirements-build.txt \ --constraint ./constraints.txt \ "$(echo ./dist/cudf_polars_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \ "$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \ diff --git a/ci/test_wheel_dask_cudf.sh b/ci/test_wheel_dask_cudf.sh index e15949f4bdb..93c8262fc32 100755 --- a/ci/test_wheel_dask_cudf.sh +++ b/ci/test_wheel_dask_cudf.sh @@ -16,9 +16,16 @@ rapids-logger "Install dask_cudf, cudf, pylibcudf, and test requirements" # generate constraints (possibly pinning to oldest support versions of dependencies) rapids-generate-pip-constraints py_test_dask_cudf ./constraints.txt +# Download wheel from +LIBKVIKIO_CHANNEL=$( +RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python? +) +echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt + # echo to expand wildcard before adding `[extra]` requires for pip python -m pip install \ -v \ + -r /tmp/requirements-build.txt \ --constraint ./constraints.txt \ "$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \ "$(echo ./dist/dask_cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \ diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 15a4a270ce0..9ea39e692b6 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -26,6 +26,7 @@ #include #include +#include #include @@ -33,6 +34,7 @@ #include #include +#include #include namespace cudf { @@ -389,6 +391,86 @@ class user_datasource_wrapper : public datasource { datasource* const source; ///< A non-owning pointer to the user-implemented datasource }; +/** + * @brief Remote file source backed by KvikIO, which handles S3 filepaths seamlessly. + */ +class remote_file_source : public datasource { + static std::unique_ptr create_s3_endpoint(char const* filepath) + { + auto [bucket_name, bucket_object] = kvikio::S3Endpoint::parse_s3_url(filepath); + return std::make_unique(bucket_name, bucket_object); + } + + public: + explicit remote_file_source(char const* filepath) : _kvikio_file{create_s3_endpoint(filepath)} {} + + ~remote_file_source() override = default; + + [[nodiscard]] bool supports_device_read() const override { return true; } + + [[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; } + + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } + + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); + + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset); + } + + size_t device_read(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + return device_read_async(offset, size, dst, stream).get(); + } + + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override + { + rmm::device_buffer out_data(size, stream); + size_t read = device_read(offset, size, reinterpret_cast(out_data.data()), stream); + out_data.resize(read, stream); + return datasource::buffer::create(std::move(out_data)); + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto const read_size = std::min(size, this->size() - offset); + return _kvikio_file.pread(dst, read_size, offset).get(); + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto const count = std::min(size, this->size() - offset); + std::vector h_data(count); + this->host_read(offset, count, h_data.data()); + return datasource::buffer::create(std::move(h_data)); + } + + /** + * @brief Is `url` referring to a remote file supported by KvikIO? + * + * For now, only S3 urls (urls starting with "s3://") are supported. + */ + static bool is_supported_remote_url(std::string const& url) + { + // Regular expression to match "s3://" + std::regex pattern{R"(^s3://)", std::regex_constants::icase}; + return std::regex_search(url, pattern); + } + + private: + kvikio::RemoteHandle _kvikio_file; +}; + } // namespace std::unique_ptr datasource::create(std::string const& filepath, @@ -403,8 +485,9 @@ std::unique_ptr datasource::create(std::string const& filepath, CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy); }(); - - if (use_memory_mapping) { + if (remote_file_source::is_supported_remote_url(filepath)) { + return std::make_unique(filepath.c_str()); + } else if (use_memory_mapping) { return std::make_unique(filepath.c_str(), offset, max_size_estimate); } else { // `file_source` reads the file directly, without memory mapping diff --git a/dependencies.yaml b/dependencies.yaml index 90255ca674c..a92f7258f4b 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -178,6 +178,7 @@ files: table: project includes: - depends_on_nvcomp + - depends_on_libkvikio py_build_pylibcudf: output: pyproject pyproject_dir: python/pylibcudf diff --git a/python/cudf/cudf/options.py b/python/cudf/cudf/options.py index df7bbe22a61..e206c8bca08 100644 --- a/python/cudf/cudf/options.py +++ b/python/cudf/cudf/options.py @@ -351,6 +351,22 @@ def _integer_and_none_validator(val): _make_contains_validator([False, True]), ) +_register_option( + "kvikio_remote_io", + _env_get_bool("CUDF_KVIKIO_REMOTE_IO", False), + textwrap.dedent( + """ + Whether to use KvikIO's remote IO backend or not. + \tWARN: this is experimental and may be removed at any time + \twithout warning or deprecation period. + \tSet KVIKIO_NTHREADS (default is 8) to change the number of + \tconcurrent tcp connections, which is important for good performance. + \tValid values are True or False. Default is False. + """ + ), + _make_contains_validator([False, True]), +) + class option_context(ContextDecorator): """ diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 0958b68084d..afb82f75bcf 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -69,6 +69,7 @@ def s3_base(endpoint_ip, endpoint_port): # with an S3 endpoint on localhost endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/" + os.environ["AWS_ENDPOINT_URL"] = endpoint_uri server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port) server.start() @@ -105,6 +106,15 @@ def s3_context(s3_base, bucket, files=None): pass +@pytest.fixture( + params=[True, False], + ids=["kvikio=ON", "kvikio=OFF"], +) +def kvikio_remote_io(request): + with cudf.option_context("kvikio_remote_io", request.param): + yield request.param + + @pytest.fixture def pdf(scope="module"): df = pd.DataFrame() @@ -193,6 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize): def test_read_parquet( s3_base, s3so, + kvikio_remote_io, pdf, bytes_per_thread, columns, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index d636f36f282..aecb7ae7c5c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -16,6 +16,7 @@ import pandas as pd from fsspec.core import expand_paths_if_needed, get_fs_token_paths +import cudf from cudf.api.types import is_list_like from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial @@ -1624,6 +1625,16 @@ def _maybe_expand_directories(paths, glob_pattern, fs): return expanded_paths +def _use_kvikio_remote_io(fs) -> bool: + """Whether `kvikio_remote_io` is enabled and `fs` refers to a S3 file""" + + try: + from s3fs.core import S3FileSystem + except ImportError: + return False + return cudf.get_option("kvikio_remote_io") and isinstance(fs, S3FileSystem) + + @doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, @@ -1649,17 +1660,17 @@ def get_reader_filepath_or_buffer( ) ] if not input_sources: - raise ValueError("Empty input source list: {input_sources}.") + raise ValueError(f"Empty input source list: {input_sources}.") filepaths_or_buffers = [] string_paths = [isinstance(source, str) for source in input_sources] if any(string_paths): - # Sources are all strings. Thes strings are typically + # Sources are all strings. The strings are typically # file paths, but they may also be raw text strings. # Don't allow a mix of source types if not all(string_paths): - raise ValueError("Invalid input source list: {input_sources}.") + raise ValueError(f"Invalid input source list: {input_sources}.") # Make sure we define a filesystem (if possible) paths = input_sources @@ -1712,11 +1723,17 @@ def get_reader_filepath_or_buffer( raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - filepaths_or_buffers = _prefetch_remote_buffers( - paths, - fs, - **(prefetch_options or {}), - ) + + # If `kvikio_remote_io` is enabled and `fs` refers to a S3 file, + # we create S3 URLs and let them pass-through to libcudf. + if _use_kvikio_remote_io(fs): + filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths] + else: + filepaths_or_buffers = _prefetch_remote_buffers( + paths, + fs, + **(prefetch_options or {}), + ) else: raw_text_input = True diff --git a/python/libcudf/CMakeLists.txt b/python/libcudf/CMakeLists.txt index 5f9a04d3cee..bedc3dafcab 100644 --- a/python/libcudf/CMakeLists.txt +++ b/python/libcudf/CMakeLists.txt @@ -23,6 +23,9 @@ project( ) option(USE_NVCOMP_RUNTIME_WHEEL "Use the nvcomp wheel at runtime instead of the system library" OFF) +option(USE_LIBKVIKIO_RUNTIME_WHEEL + "Use the libkvikio wheel at runtime instead of the system library" OFF +) # Check if cudf is already available. If so, it is the user's responsibility to ensure that the # CMake package is also available at build time of the Python cudf package. @@ -58,3 +61,12 @@ if(USE_NVCOMP_RUNTIME_WHEEL) APPEND ) endif() + +if(USE_LIBKVIKIO_RUNTIME_WHEEL) + set(rpaths "$ORIGIN/../../libkvikio/lib64") + set_property( + TARGET cudf + PROPERTY INSTALL_RPATH ${rpaths} + APPEND + ) +endif() diff --git a/python/libcudf/pyproject.toml b/python/libcudf/pyproject.toml index c6d9ae56467..62726bb0df4 100644 --- a/python/libcudf/pyproject.toml +++ b/python/libcudf/pyproject.toml @@ -38,6 +38,7 @@ classifiers = [ "Environment :: GPU :: NVIDIA CUDA", ] dependencies = [ + "libkvikio==24.12.*,>=0.0.0a0", "nvidia-nvcomp==4.1.0.6", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 563a02761da..86aa684b518 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -20,6 +20,7 @@ import codecs import errno import io import os +import re from pylibcudf.libcudf.io.json import \ json_recovery_mode_t as JSONRecoveryMode # no-cython-lint @@ -143,6 +144,8 @@ cdef class SourceInfo: Mixing different types of sources will raise a `ValueError`. """ + # Regular expression that match remote file paths supported by libcudf + _is_remote_file_pattern = re.compile(r"^s3://", re.IGNORECASE) def __init__(self, list sources): if not sources: @@ -157,11 +160,10 @@ cdef class SourceInfo: for src in sources: if not isinstance(src, (os.PathLike, str)): raise ValueError("All sources must be of the same type!") - if not os.path.isfile(src): - raise FileNotFoundError(errno.ENOENT, - os.strerror(errno.ENOENT), - src) - + if not (os.path.isfile(src) or self._is_remote_file_pattern.match(src)): + raise FileNotFoundError( + errno.ENOENT, os.strerror(errno.ENOENT), src + ) c_files.push_back( str(src).encode()) self.c_obj = move(source_info(c_files))