diff --git a/python/ucxx/ucxx/benchmarks/backends/asyncio.py b/python/ucxx/ucxx/benchmarks/backends/asyncio.py new file mode 100644 index 00000000..45ae5dc6 --- /dev/null +++ b/python/ucxx/ucxx/benchmarks/backends/asyncio.py @@ -0,0 +1,100 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause + +import asyncio +import warnings +from time import monotonic + +from ucxx.benchmarks.backends.base import BaseClient, BaseServer + + +class AsyncioServer(BaseServer): + has_cuda_support = False + + def __init__(self, args, queue): + self.args = args + self.queue = queue + self._serve_task = None + + async def _start_listener(self, port): + for i in range(10000, 60000): + try: + return i, await asyncio.start_server(self.handle_stream, "0.0.0.0", i) + except OSError: + continue + raise Exception("Could not start server") + + async def handle_stream(self, reader, writer): + for i in range(self.args.n_iter + self.args.n_warmup_iter): + try: + recv_msg = await reader.read(self.args.n_bytes) + writer.write(recv_msg) + await writer.drain() + except ConnectionResetError: + break + + writer.close() + await writer.wait_closed() + + self._serve_task.cancel() + + async def serve_forever(self): + if self.args.port is not None: + port, server = self.args.port, await asyncio.start_server( + self.handle_stream, "0.0.0.0", self.args.port + ) + else: + port, server = await self._start_listener(None) + + self.queue.put(port) + async with server: + await server.serve_forever() + + async def run(self): + self._serve_task = asyncio.create_task(self.serve_forever()) + + try: + await self._serve_task + except asyncio.CancelledError: + pass + + +class AsyncioClient(BaseClient): + has_cuda_support = False + + def __init__(self, args, queue, server_address, port): + self.args = args + self.queue = queue + self.server_address = server_address + self.port = port + + async def run(self): + reader, writer = await asyncio.open_connection( + self.server_address, self.port, limit=1024**3 + ) + + if self.args.reuse_alloc: + warnings.warn( + "Reuse allocation not supported by 'asyncio' backend, it will be " + "ignored." + ) + + send_msg = ("x" * self.args.n_bytes).encode() + + times = [] + for i in range(self.args.n_iter + self.args.n_warmup_iter): + start = monotonic() + + try: + writer.write(send_msg) + await writer.drain() + await reader.read(self.args.n_bytes) + except ConnectionResetError: + break + + stop = monotonic() + if i >= self.args.n_warmup_iter: + times.append(stop - start) + self.queue.put(times) + writer.close() + await writer.wait_closed() diff --git a/python/ucxx/ucxx/benchmarks/backends/socket.py b/python/ucxx/ucxx/benchmarks/backends/socket.py new file mode 100644 index 00000000..b6cf75d0 --- /dev/null +++ b/python/ucxx/ucxx/benchmarks/backends/socket.py @@ -0,0 +1,102 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause + +import socket +import threading +from time import monotonic + +import numpy as np + +from ucxx.benchmarks.backends.base import BaseClient, BaseServer + + +class SocketServer(BaseServer): + has_cuda_support = False + + def __init__(self, args, queue): + self.args = args + self.queue = queue + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def _start_listener(self, server, port): + host = "" + if port is not None: + server.bind((host, port)) + else: + for i in range(10000, 60000): + try: + server.bind((host, i)) + except OSError: + continue + else: + port = i + break + + server.listen() + return port + + def handle_client(self, client_socket): + args = self.args + + if args.reuse_alloc: + recv_msg = np.zeros(args.n_bytes, dtype="u1") + assert recv_msg.nbytes == args.n_bytes + + for _ in range(args.n_iter + args.n_warmup_iter): + if not args.reuse_alloc: + recv_msg = np.zeros(args.n_bytes, dtype="u1") + + try: + client_socket.recv_into(recv_msg.data) + client_socket.sendall(recv_msg.data) + except socket.error as e: + print(e) + break + + client_socket.close() + return + + def run(self): + port = self._start_listener(self.server, self.args.port) + self.queue.put(port) + + client_socket, addr = self.server.accept() + threading.Thread(target=self.handle_client, args=(client_socket,)).start() + + self.server.close() + + +class SocketClient(BaseClient): + has_cuda_support = False + + def __init__(self, args, queue, server_address, port): + self.args = args + self.queue = queue + self.server_address = server_address + self.port = port + self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def run(self) -> bool: + self.client.connect((self.server_address, self.port)) + send_msg = np.arange(self.args.n_bytes, dtype="u1") + assert send_msg.nbytes == self.args.n_bytes + + if self.args.reuse_alloc: + recv_msg = np.zeros(self.args.n_bytes, dtype="u1") + assert recv_msg.nbytes == self.args.n_bytes + + times = [] + for i in range(self.args.n_iter + self.args.n_warmup_iter): + start = monotonic() + + if not self.args.reuse_alloc: + recv_msg = np.zeros(self.args.n_bytes, dtype="u1") + + self.client.sendall(send_msg.data) + self.client.recv_into(recv_msg.data) + + stop = monotonic() + if i >= self.args.n_warmup_iter: + times.append(stop - start) + + self.queue.put(times) diff --git a/python/ucxx/ucxx/benchmarks/send_recv.py b/python/ucxx/ucxx/benchmarks/send_recv.py index ba07e8f8..572cd008 100644 --- a/python/ucxx/ucxx/benchmarks/send_recv.py +++ b/python/ucxx/ucxx/benchmarks/send_recv.py @@ -10,6 +10,8 @@ import ucxx from ucxx._lib_async.utils import get_event_loop +from ucxx.benchmarks.backends.asyncio import AsyncioClient, AsyncioServer +from ucxx.benchmarks.backends.socket import SocketClient, SocketServer from ucxx.benchmarks.backends.ucxx_async import ( UCXPyAsyncClient, UCXPyAsyncServer, @@ -30,13 +32,22 @@ def _get_backend_implementation(backend): return {"client": UCXPyAsyncClient, "server": UCXPyAsyncServer} elif backend == "ucxx-core": return {"client": UCXPyCoreClient, "server": UCXPyCoreServer} + elif backend == "asyncio": + return {"client": AsyncioClient, "server": AsyncioServer} + elif backend == "socket": + return {"client": SocketClient, "server": SocketServer} elif backend == "tornado": - from ucxx.benchmarks.backends.tornado import ( - TornadoClient, - TornadoServer, - ) + try: + import tornado # noqa: F401 + except ImportError as e: + raise e + else: + from ucxx.benchmarks.backends.tornado import ( + TornadoClient, + TornadoServer, + ) - return {"client": TornadoClient, "server": TornadoServer} + return {"client": TornadoClient, "server": TornadoServer} raise ValueError(f"Unknown backend {backend}") @@ -95,6 +106,7 @@ def client(queue, port, server_address, args): print_key_value(key="Number of buffers", value=f"{args.n_buffers}") print_key_value(key="Object type", value=f"{args.object_type}") print_key_value(key="Reuse allocation", value=f"{args.reuse_alloc}") + print_key_value(key="Backend", value=f"{args.backend}") client.print_backend_specific_config() print_separator(separator="=") if args.object_type == "numpy": @@ -289,7 +301,7 @@ def parse_args(): default="ucxx-async", type=str, help="Backend Library (-l) to use, options are: 'ucxx-async' (default), " - "'ucxx-core' and 'tornado'.", + "'ucxx-core', 'asyncio', 'socket' and 'tornado'.", ) parser.add_argument( "--progress-mode",