Skip to content

Commit

Permalink
Extend Python benchmarks backends (#309)
Browse files Browse the repository at this point in the history
Extend Python benchmarks with `socket` and `asyncio.Stream{Reader,Writer}` APIs. This should help us better understand the overhead of UCXX compared to Python's internal implementations, and thus help us improve potential suboptimal code in our implementation.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #309
  • Loading branch information
pentschev authored Nov 4, 2024
1 parent 922b3a4 commit 7069174
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 6 deletions.
100 changes: 100 additions & 0 deletions python/ucxx/ucxx/benchmarks/backends/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import warnings
from time import monotonic

from ucxx.benchmarks.backends.base import BaseClient, BaseServer


class AsyncioServer(BaseServer):
has_cuda_support = False

def __init__(self, args, queue):
self.args = args
self.queue = queue
self._serve_task = None

async def _start_listener(self, port):
for i in range(10000, 60000):
try:
return i, await asyncio.start_server(self.handle_stream, "0.0.0.0", i)
except OSError:
continue
raise Exception("Could not start server")

async def handle_stream(self, reader, writer):
for i in range(self.args.n_iter + self.args.n_warmup_iter):
try:
recv_msg = await reader.read(self.args.n_bytes)
writer.write(recv_msg)
await writer.drain()
except ConnectionResetError:
break

writer.close()
await writer.wait_closed()

self._serve_task.cancel()

async def serve_forever(self):
if self.args.port is not None:
port, server = self.args.port, await asyncio.start_server(
self.handle_stream, "0.0.0.0", self.args.port
)
else:
port, server = await self._start_listener(None)

self.queue.put(port)
async with server:
await server.serve_forever()

async def run(self):
self._serve_task = asyncio.create_task(self.serve_forever())

try:
await self._serve_task
except asyncio.CancelledError:
pass


class AsyncioClient(BaseClient):
has_cuda_support = False

def __init__(self, args, queue, server_address, port):
self.args = args
self.queue = queue
self.server_address = server_address
self.port = port

async def run(self):
reader, writer = await asyncio.open_connection(
self.server_address, self.port, limit=1024**3
)

if self.args.reuse_alloc:
warnings.warn(
"Reuse allocation not supported by 'asyncio' backend, it will be "
"ignored."
)

send_msg = ("x" * self.args.n_bytes).encode()

times = []
for i in range(self.args.n_iter + self.args.n_warmup_iter):
start = monotonic()

try:
writer.write(send_msg)
await writer.drain()
await reader.read(self.args.n_bytes)
except ConnectionResetError:
break

stop = monotonic()
if i >= self.args.n_warmup_iter:
times.append(stop - start)
self.queue.put(times)
writer.close()
await writer.wait_closed()
102 changes: 102 additions & 0 deletions python/ucxx/ucxx/benchmarks/backends/socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import socket
import threading
from time import monotonic

import numpy as np

from ucxx.benchmarks.backends.base import BaseClient, BaseServer


class SocketServer(BaseServer):
has_cuda_support = False

def __init__(self, args, queue):
self.args = args
self.queue = queue
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def _start_listener(self, server, port):
host = ""
if port is not None:
server.bind((host, port))
else:
for i in range(10000, 60000):
try:
server.bind((host, i))
except OSError:
continue
else:
port = i
break

server.listen()
return port

def handle_client(self, client_socket):
args = self.args

if args.reuse_alloc:
recv_msg = np.zeros(args.n_bytes, dtype="u1")
assert recv_msg.nbytes == args.n_bytes

for _ in range(args.n_iter + args.n_warmup_iter):
if not args.reuse_alloc:
recv_msg = np.zeros(args.n_bytes, dtype="u1")

try:
client_socket.recv_into(recv_msg.data)
client_socket.sendall(recv_msg.data)
except socket.error as e:
print(e)
break

client_socket.close()
return

def run(self):
port = self._start_listener(self.server, self.args.port)
self.queue.put(port)

client_socket, addr = self.server.accept()
threading.Thread(target=self.handle_client, args=(client_socket,)).start()

self.server.close()


class SocketClient(BaseClient):
has_cuda_support = False

def __init__(self, args, queue, server_address, port):
self.args = args
self.queue = queue
self.server_address = server_address
self.port = port
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def run(self) -> bool:
self.client.connect((self.server_address, self.port))
send_msg = np.arange(self.args.n_bytes, dtype="u1")
assert send_msg.nbytes == self.args.n_bytes

if self.args.reuse_alloc:
recv_msg = np.zeros(self.args.n_bytes, dtype="u1")
assert recv_msg.nbytes == self.args.n_bytes

times = []
for i in range(self.args.n_iter + self.args.n_warmup_iter):
start = monotonic()

if not self.args.reuse_alloc:
recv_msg = np.zeros(self.args.n_bytes, dtype="u1")

self.client.sendall(send_msg.data)
self.client.recv_into(recv_msg.data)

stop = monotonic()
if i >= self.args.n_warmup_iter:
times.append(stop - start)

self.queue.put(times)
24 changes: 18 additions & 6 deletions python/ucxx/ucxx/benchmarks/send_recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import ucxx
from ucxx._lib_async.utils import get_event_loop
from ucxx.benchmarks.backends.asyncio import AsyncioClient, AsyncioServer
from ucxx.benchmarks.backends.socket import SocketClient, SocketServer
from ucxx.benchmarks.backends.ucxx_async import (
UCXPyAsyncClient,
UCXPyAsyncServer,
Expand All @@ -30,13 +32,22 @@ def _get_backend_implementation(backend):
return {"client": UCXPyAsyncClient, "server": UCXPyAsyncServer}
elif backend == "ucxx-core":
return {"client": UCXPyCoreClient, "server": UCXPyCoreServer}
elif backend == "asyncio":
return {"client": AsyncioClient, "server": AsyncioServer}
elif backend == "socket":
return {"client": SocketClient, "server": SocketServer}
elif backend == "tornado":
from ucxx.benchmarks.backends.tornado import (
TornadoClient,
TornadoServer,
)
try:
import tornado # noqa: F401
except ImportError as e:
raise e
else:
from ucxx.benchmarks.backends.tornado import (
TornadoClient,
TornadoServer,
)

return {"client": TornadoClient, "server": TornadoServer}
return {"client": TornadoClient, "server": TornadoServer}

raise ValueError(f"Unknown backend {backend}")

Expand Down Expand Up @@ -95,6 +106,7 @@ def client(queue, port, server_address, args):
print_key_value(key="Number of buffers", value=f"{args.n_buffers}")
print_key_value(key="Object type", value=f"{args.object_type}")
print_key_value(key="Reuse allocation", value=f"{args.reuse_alloc}")
print_key_value(key="Backend", value=f"{args.backend}")
client.print_backend_specific_config()
print_separator(separator="=")
if args.object_type == "numpy":
Expand Down Expand Up @@ -289,7 +301,7 @@ def parse_args():
default="ucxx-async",
type=str,
help="Backend Library (-l) to use, options are: 'ucxx-async' (default), "
"'ucxx-core' and 'tornado'.",
"'ucxx-core', 'asyncio', 'socket' and 'tornado'.",
)
parser.add_argument(
"--progress-mode",
Expand Down

0 comments on commit 7069174

Please sign in to comment.