Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IGNORE] kvikio-remote-io-ci-debugging #17195

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/build_wheel_cudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt"
python -m auditwheel repair \
--exclude libcudf.so \
--exclude libnvcomp.so \
--exclude libkvikio.so \
-w ${package_dir}/final_dist \
${package_dir}/dist/*

Expand Down
10 changes: 9 additions & 1 deletion ci/build_wheel_libcudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ rapids-dependency-file-generator \
--matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION};cuda_suffixed=true" \
| tee /tmp/requirements-build.txt


# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_cu12 rapids-get-pr-wheel-artifact kvikio 527 cpp
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

rapids-logger "Installing build requirements"
python -m pip install \
-v \
Expand All @@ -25,14 +32,15 @@ python -m pip install \
# 0 really means "add --no-build-isolation" (ref: https://github.com/pypa/pip/issues/5735)
export PIP_NO_BUILD_ISOLATION=0

export SKBUILD_CMAKE_ARGS="-DUSE_NVCOMP_RUNTIME_WHEEL=ON"
export SKBUILD_CMAKE_ARGS="-DUSE_NVCOMP_RUNTIME_WHEEL=ON;-DUSE_LIBKVIKIO_RUNTIME_WHEEL=ON"
./ci/build_wheel.sh "${package_name}" "${package_dir}"

RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"

mkdir -p ${package_dir}/final_dist
python -m auditwheel repair \
--exclude libnvcomp.so.4 \
--exclude libkvikio.so \
-w ${package_dir}/final_dist \
${package_dir}/dist/*

Expand Down
1 change: 1 addition & 0 deletions ci/build_wheel_pylibcudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt"
python -m auditwheel repair \
--exclude libcudf.so \
--exclude libnvcomp.so \
--exclude libkvikio.so \
-w ${package_dir}/final_dist \
${package_dir}/dist/*

Expand Down
8 changes: 8 additions & 0 deletions ci/cudf_pandas_scripts/pandas-tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@ RAPIDS_PY_WHEEL_NAME="cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from
RAPIDS_PY_WHEEL_NAME="libcudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 cpp ./dist
RAPIDS_PY_WHEEL_NAME="pylibcudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 python ./dist

# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python?
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

# echo to expand wildcard before adding `[extra]` requires for pip
python -m pip install \
-v \
-r /tmp/requirements-build.txt \
"$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test,pandas-tests]" \
"$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
"$(echo ./dist/pylibcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)"
Expand Down
7 changes: 7 additions & 0 deletions ci/cudf_pandas_scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ else
# generate constraints (possibly pinning to oldest support versions of dependencies)
rapids-generate-pip-constraints test_python_cudf_pandas ./constraints.txt

# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python?
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

python -m pip install \
-v \
-r /tmp/requirements-build.txt \
--constraint ./constraints.txt \
"$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test,cudf-pandas-tests]" \
"$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
Expand Down
7 changes: 7 additions & 0 deletions ci/test_wheel_cudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ rapids-logger "Install cudf, pylibcudf, and test requirements"
# generate constraints (possibly pinning to oldest support versions of dependencies)
rapids-generate-pip-constraints py_test_cudf ./constraints.txt

# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python?
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

# echo to expand wildcard before adding `[extra]` requires for pip
python -m pip install \
-v \
-r /tmp/requirements-build.txt \
--constraint ./constraints.txt \
"$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \
"$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
Expand Down
7 changes: 7 additions & 0 deletions ci/test_wheel_cudf_polars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ rapids-logger "Installing cudf_polars and its dependencies"
# generate constraints (possibly pinning to oldest support versions of dependencies)
rapids-generate-pip-constraints py_test_cudf_polars ./constraints.txt

# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python?
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

# echo to expand wildcard before adding `[test]` requires for pip
python -m pip install \
-v \
-r /tmp/requirements-build.txt \
--constraint ./constraints.txt \
"$(echo ./dist/cudf_polars_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \
"$(echo ./dist/libcudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
Expand Down
7 changes: 7 additions & 0 deletions ci/test_wheel_dask_cudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ rapids-logger "Install dask_cudf, cudf, pylibcudf, and test requirements"
# generate constraints (possibly pinning to oldest support versions of dependencies)
rapids-generate-pip-constraints py_test_dask_cudf ./constraints.txt

# Download wheel from <https://github.com/rapidsai/kvikio/pull/527>
LIBKVIKIO_CHANNEL=$(
RAPIDS_PY_WHEEL_NAME=libkvikio_${RAPIDS_PY_CUDA_SUFFIX} rapids-get-pr-wheel-artifact kvikio 527 cpp # also python?
)
echo ${LIBKVIKIO_CHANNEL}/libkvikio_*.whl >> /tmp/requirements-build.txt

# echo to expand wildcard before adding `[extra]` requires for pip
python -m pip install \
-v \
-r /tmp/requirements-build.txt \
--constraint ./constraints.txt \
"$(echo ./dist/cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)" \
"$(echo ./dist/dask_cudf_${RAPIDS_PY_CUDA_SUFFIX}*.whl)[test]" \
Expand Down
87 changes: 85 additions & 2 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
#include <cudf/utilities/span.hpp>

#include <kvikio/file_handle.hpp>
#include <kvikio/remote_handle.hpp>

#include <rmm/device_buffer.hpp>

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#include <regex>
#include <vector>

namespace cudf {
Expand Down Expand Up @@ -389,6 +391,86 @@ class user_datasource_wrapper : public datasource {
datasource* const source; ///< A non-owning pointer to the user-implemented datasource
};

/**
* @brief Remote file source backed by KvikIO, which handles S3 filepaths seamlessly.
*/
class remote_file_source : public datasource {
static std::unique_ptr<kvikio::S3Endpoint> create_s3_endpoint(char const* filepath)
{
auto [bucket_name, bucket_object] = kvikio::S3Endpoint::parse_s3_url(filepath);
return std::make_unique<kvikio::S3Endpoint>(bucket_name, bucket_object);
}

public:
explicit remote_file_source(char const* filepath) : _kvikio_file{create_s3_endpoint(filepath)} {}

~remote_file_source() override = default;

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; }

[[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); }

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
return device_read_async(offset, size, dst, stream).get();
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset).get();
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
auto const count = std::min(size, this->size() - offset);
std::vector<uint8_t> h_data(count);
this->host_read(offset, count, h_data.data());
return datasource::buffer::create(std::move(h_data));
}

/**
* @brief Is `url` referring to a remote file supported by KvikIO?
*
* For now, only S3 urls (urls starting with "s3://") are supported.
*/
static bool is_supported_remote_url(std::string const& url)
{
// Regular expression to match "s3://"
std::regex pattern{R"(^s3://)", std::regex_constants::icase};
return std::regex_search(url, pattern);
}

private:
kvikio::RemoteHandle _kvikio_file;
};

} // namespace

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
Expand All @@ -403,8 +485,9 @@ std::unique_ptr<datasource> datasource::create(std::string const& filepath,

CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy);
}();

if (use_memory_mapping) {
if (remote_file_source::is_supported_remote_url(filepath)) {
return std::make_unique<remote_file_source>(filepath.c_str());
} else if (use_memory_mapping) {
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, max_size_estimate);
} else {
// `file_source` reads the file directly, without memory mapping
Expand Down
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ files:
table: project
includes:
- depends_on_nvcomp
- depends_on_libkvikio
py_build_pylibcudf:
output: pyproject
pyproject_dir: python/pylibcudf
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ def _integer_and_none_validator(val):
_make_contains_validator([False, True]),
)

_register_option(
"kvikio_remote_io",
_env_get_bool("CUDF_KVIKIO_REMOTE_IO", False),
textwrap.dedent(
"""
Whether to use KvikIO's remote IO backend or not.
\tWARN: this is experimental and may be removed at any time
\twithout warning or deprecation period.
\tSet KVIKIO_NTHREADS (default is 8) to change the number of
\tconcurrent tcp connections, which is important for good performance.
\tValid values are True or False. Default is False.
"""
),
_make_contains_validator([False, True]),
)


class option_context(ContextDecorator):
"""
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def s3_base(endpoint_ip, endpoint_port):
# with an S3 endpoint on localhost

endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"
os.environ["AWS_ENDPOINT_URL"] = endpoint_uri

server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port)
server.start()
Expand Down Expand Up @@ -105,6 +106,15 @@ def s3_context(s3_base, bucket, files=None):
pass


@pytest.fixture(
params=[True, False],
ids=["kvikio=ON", "kvikio=OFF"],
)
def kvikio_remote_io(request):
with cudf.option_context("kvikio_remote_io", request.param):
yield request.param


@pytest.fixture
def pdf(scope="module"):
df = pd.DataFrame()
Expand Down Expand Up @@ -193,6 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):
def test_read_parquet(
s3_base,
s3so,
kvikio_remote_io,
pdf,
bytes_per_thread,
columns,
Expand Down
33 changes: 25 additions & 8 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas as pd
from fsspec.core import expand_paths_if_needed, get_fs_token_paths

import cudf
from cudf.api.types import is_list_like
from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial
Expand Down Expand Up @@ -1624,6 +1625,16 @@ def _maybe_expand_directories(paths, glob_pattern, fs):
return expanded_paths


def _use_kvikio_remote_io(fs) -> bool:
"""Whether `kvikio_remote_io` is enabled and `fs` refers to a S3 file"""

try:
from s3fs.core import S3FileSystem
except ImportError:
return False
return cudf.get_option("kvikio_remote_io") and isinstance(fs, S3FileSystem)


@doc_get_reader_filepath_or_buffer()
def get_reader_filepath_or_buffer(
path_or_data,
Expand All @@ -1649,17 +1660,17 @@ def get_reader_filepath_or_buffer(
)
]
if not input_sources:
raise ValueError("Empty input source list: {input_sources}.")
raise ValueError(f"Empty input source list: {input_sources}.")

filepaths_or_buffers = []
string_paths = [isinstance(source, str) for source in input_sources]
if any(string_paths):
# Sources are all strings. Thes strings are typically
# Sources are all strings. The strings are typically
# file paths, but they may also be raw text strings.

# Don't allow a mix of source types
if not all(string_paths):
raise ValueError("Invalid input source list: {input_sources}.")
raise ValueError(f"Invalid input source list: {input_sources}.")

# Make sure we define a filesystem (if possible)
paths = input_sources
Expand Down Expand Up @@ -1712,11 +1723,17 @@ def get_reader_filepath_or_buffer(
raise FileNotFoundError(
f"{input_sources} could not be resolved to any files"
)
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)

# If `kvikio_remote_io` is enabled and `fs` refers to a S3 file,
# we create S3 URLs and let them pass-through to libcudf.
if _use_kvikio_remote_io(fs):
filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths]
else:
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)
else:
raw_text_input = True

Expand Down
Loading
Loading