Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-0.41' into python-block…
Browse files Browse the repository at this point in the history
…ing-progress-improvements
  • Loading branch information
pentschev committed Nov 5, 2024
2 parents 1be1476 + 7069174 commit c317f82
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ repos:
- --fix
- --rapids-version=24.12
- repo: https://github.com/rapidsai/dependency-file-generator
rev: v1.13.11
rev: v1.16.0
hooks:
- id: rapids-dependency-file-generator
args: ["--clean"]
Expand Down
4 changes: 4 additions & 0 deletions ci/build_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ rapids-print-env

rapids-logger "Begin C++ and Python builds"

sccache --zero-stats

rapids-conda-retry mambabuild \
conda/recipes/ucxx

sccache --show-adv-stats

rapids-upload-conda-to-s3 cpp
35 changes: 10 additions & 25 deletions ci/build_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,18 @@ package_dir=$2
source rapids-configure-sccache
source rapids-date-string

RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"

rapids-generate-version > ./VERSION

if [[ ${package_name} == "distributed-ucxx" ]]; then
python -m pip wheel "${package_dir}/" -w "${package_dir}/dist" -vvv --no-deps --disable-pip-version-check

RAPIDS_PY_WHEEL_NAME="distributed_ucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 python ${package_dir}/dist
elif [[ ${package_name} == "libucxx" ]]; then
SKBUILD_CMAKE_ARGS="-DUCXX_ENABLE_RMM=ON" \
python -m pip wheel "${package_dir}"/ -w "${package_dir}"/dist -vvv --no-deps --disable-pip-version-check

python -m auditwheel repair -w ${package_dir}/final_dist --exclude "libucp.so.0" ${package_dir}/dist/*

RAPIDS_PY_WHEEL_NAME="libucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 cpp ${package_dir}/final_dist
elif [[ ${package_name} == "ucxx" ]]; then
CPP_WHEELHOUSE=$(RAPIDS_PY_WHEEL_NAME="libucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 cpp /tmp/libucxx_dist)
echo "libucxx-${RAPIDS_PY_CUDA_SUFFIX} @ file://$(echo ${CPP_WHEELHOUSE}/libucxx_*.whl)" > "${package_dir}/constraints.txt"
cd "${package_dir}"

PIP_CONSTRAINT="${package_dir}/constraints.txt" \
SKBUILD_CMAKE_ARGS="-DFIND_UCXX_CPP=ON;-DCMAKE_INSTALL_LIBDIR=ucxx/lib64;-DCMAKE_INSTALL_INCLUDEDIR=ucxx/include" \
python -m pip wheel "${package_dir}"/ -w "${package_dir}"/dist -vvv --no-deps --disable-pip-version-check
sccache --zero-stats

python -m auditwheel repair -w ${package_dir}/final_dist --exclude "libucp.so.0" --exclude "libucxx.so" ${package_dir}/dist/*
rapids-logger "Building '${package_name}' wheel"
python -m pip wheel \
-w dist \
-v \
--no-deps \
--disable-pip-version-check \
.

RAPIDS_PY_WHEEL_NAME="ucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 python ${package_dir}/final_dist
else
echo "Unknown package '${package_name}'"
exit 1
fi
sccache --show-adv-stats
6 changes: 5 additions & 1 deletion ci/build_wheel_distributed_ucxx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ set -euo pipefail

package_dir="python/distributed-ucxx"

./ci/build_wheel.sh distributed-ucxx ${package_dir}
RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"

./ci/build_wheel.sh distributed-ucxx "${package_dir}"

RAPIDS_PY_WHEEL_NAME="distributed_ucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 python "${package_dir}/dist"
34 changes: 33 additions & 1 deletion ci/build_wheel_libucxx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,38 @@

set -euo pipefail

package_name="libucxx"
package_dir="python/libucxx"

./ci/build_wheel.sh libucxx ${package_dir}
RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"

rapids-logger "Generating build requirements"

rapids-dependency-file-generator \
--output requirements \
--file-key "py_build_${package_name}" \
--file-key "py_rapids_build_${package_name}" \
--matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION};cuda_suffixed=true" \
| tee /tmp/requirements-build.txt

rapids-logger "Installing build requirements"
python -m pip install \
-v \
--prefer-binary \
-r /tmp/requirements-build.txt

# build with '--no-build-isolation', for better sccache hit rate
# 0 really means "add --no-build-isolation" (ref: https://github.com/pypa/pip/issues/5735)
export PIP_NO_BUILD_ISOLATION=0

export SKBUILD_CMAKE_ARGS="-DUCXX_ENABLE_RMM=ON"

./ci/build_wheel.sh "${package_name}" "${package_dir}"

mkdir -p "${package_dir}/final_dist"
python -m auditwheel repair \
--exclude "libucp.so.0" \
-w "${package_dir}/final_dist" \
${package_dir}/dist/*

RAPIDS_PY_WHEEL_NAME="${package_name}_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 cpp "${package_dir}/final_dist"
25 changes: 24 additions & 1 deletion ci/build_wheel_ucxx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,29 @@

set -euo pipefail

package_name="ucxx"
package_dir="python/ucxx"

./ci/build_wheel.sh ucxx ${package_dir}
RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"

# Downloads libucxx wheel from this current build,
# then ensures 'ucxx' wheel builds always use the 'libucxx' just built in the same CI run.
#
# Using env variable PIP_CONSTRAINT is necessary to ensure the constraints
# are used when creating the isolated build environment.
RAPIDS_PY_WHEEL_NAME="libucxx_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 cpp /tmp/libucxx_dist
echo "libucxx-${RAPIDS_PY_CUDA_SUFFIX} @ file://$(echo /tmp/libucxx_dist/libucxx_*.whl)" > /tmp/constraints.txt
export PIP_CONSTRAINT="/tmp/constraints.txt"

export SKBUILD_CMAKE_ARGS="-DFIND_UCXX_CPP=ON;-DCMAKE_INSTALL_LIBDIR=ucxx/lib64;-DCMAKE_INSTALL_INCLUDEDIR=ucxx/include"

./ci/build_wheel.sh "${package_name}" "${package_dir}"

mkdir -p "${package_dir}/final_dist"
python -m auditwheel repair \
--exclude "libucp.so.0" \
--exclude "libucxx.so" \
-w "${package_dir}/final_dist" \
${package_dir}/dist/*

RAPIDS_PY_WHEEL_NAME="${package_name}_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 python "${package_dir}/final_dist"
100 changes: 100 additions & 0 deletions python/ucxx/ucxx/benchmarks/backends/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import warnings
from time import monotonic

from ucxx.benchmarks.backends.base import BaseClient, BaseServer


class AsyncioServer(BaseServer):
has_cuda_support = False

def __init__(self, args, queue):
self.args = args
self.queue = queue
self._serve_task = None

async def _start_listener(self, port):
for i in range(10000, 60000):
try:
return i, await asyncio.start_server(self.handle_stream, "0.0.0.0", i)
except OSError:
continue
raise Exception("Could not start server")

async def handle_stream(self, reader, writer):
for i in range(self.args.n_iter + self.args.n_warmup_iter):
try:
recv_msg = await reader.read(self.args.n_bytes)
writer.write(recv_msg)
await writer.drain()
except ConnectionResetError:
break

writer.close()
await writer.wait_closed()

self._serve_task.cancel()

async def serve_forever(self):
if self.args.port is not None:
port, server = self.args.port, await asyncio.start_server(
self.handle_stream, "0.0.0.0", self.args.port
)
else:
port, server = await self._start_listener(None)

self.queue.put(port)
async with server:
await server.serve_forever()

async def run(self):
self._serve_task = asyncio.create_task(self.serve_forever())

try:
await self._serve_task
except asyncio.CancelledError:
pass


class AsyncioClient(BaseClient):
has_cuda_support = False

def __init__(self, args, queue, server_address, port):
self.args = args
self.queue = queue
self.server_address = server_address
self.port = port

async def run(self):
reader, writer = await asyncio.open_connection(
self.server_address, self.port, limit=1024**3
)

if self.args.reuse_alloc:
warnings.warn(
"Reuse allocation not supported by 'asyncio' backend, it will be "
"ignored."
)

send_msg = ("x" * self.args.n_bytes).encode()

times = []
for i in range(self.args.n_iter + self.args.n_warmup_iter):
start = monotonic()

try:
writer.write(send_msg)
await writer.drain()
await reader.read(self.args.n_bytes)
except ConnectionResetError:
break

stop = monotonic()
if i >= self.args.n_warmup_iter:
times.append(stop - start)
self.queue.put(times)
writer.close()
await writer.wait_closed()
102 changes: 102 additions & 0 deletions python/ucxx/ucxx/benchmarks/backends/socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import socket
import threading
from time import monotonic

import numpy as np

from ucxx.benchmarks.backends.base import BaseClient, BaseServer


class SocketServer(BaseServer):
has_cuda_support = False

def __init__(self, args, queue):
self.args = args
self.queue = queue
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def _start_listener(self, server, port):
host = ""
if port is not None:
server.bind((host, port))
else:
for i in range(10000, 60000):
try:
server.bind((host, i))
except OSError:
continue
else:
port = i
break

server.listen()
return port

def handle_client(self, client_socket):
args = self.args

if args.reuse_alloc:
recv_msg = np.zeros(args.n_bytes, dtype="u1")
assert recv_msg.nbytes == args.n_bytes

for _ in range(args.n_iter + args.n_warmup_iter):
if not args.reuse_alloc:
recv_msg = np.zeros(args.n_bytes, dtype="u1")

try:
client_socket.recv_into(recv_msg.data)
client_socket.sendall(recv_msg.data)
except socket.error as e:
print(e)
break

client_socket.close()
return

def run(self):
port = self._start_listener(self.server, self.args.port)
self.queue.put(port)

client_socket, addr = self.server.accept()
threading.Thread(target=self.handle_client, args=(client_socket,)).start()

self.server.close()


class SocketClient(BaseClient):
has_cuda_support = False

def __init__(self, args, queue, server_address, port):
self.args = args
self.queue = queue
self.server_address = server_address
self.port = port
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def run(self) -> bool:
self.client.connect((self.server_address, self.port))
send_msg = np.arange(self.args.n_bytes, dtype="u1")
assert send_msg.nbytes == self.args.n_bytes

if self.args.reuse_alloc:
recv_msg = np.zeros(self.args.n_bytes, dtype="u1")
assert recv_msg.nbytes == self.args.n_bytes

times = []
for i in range(self.args.n_iter + self.args.n_warmup_iter):
start = monotonic()

if not self.args.reuse_alloc:
recv_msg = np.zeros(self.args.n_bytes, dtype="u1")

self.client.sendall(send_msg.data)
self.client.recv_into(recv_msg.data)

stop = monotonic()
if i >= self.args.n_warmup_iter:
times.append(stop - start)

self.queue.put(times)
Loading

0 comments on commit c317f82

Please sign in to comment.