diff --git a/python/ucxx/ucxx/_lib_async/exchange_peer_info.py b/python/ucxx/ucxx/_lib_async/exchange_peer_info.py index 7594cb53..5995605e 100644 --- a/python/ucxx/ucxx/_lib_async/exchange_peer_info.py +++ b/python/ucxx/ucxx/_lib_async/exchange_peer_info.py @@ -19,21 +19,21 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeo # Pack peer information incl. a checksum fmt = "QQQ" my_info = struct.pack(fmt, msg_tag, ctrl_tag, hash64bits(msg_tag, ctrl_tag)) + peer_info = bytearray(len(my_info)) my_info_arr = Array(my_info) + peer_info_arr = Array(peer_info) # Send/recv peer information. Notice, we force an `await` between the two # streaming calls (see ) if listener is True: - req = endpoint.am_send(my_info_arr) + req = endpoint.stream_send(my_info_arr) await asyncio.wait_for(req.wait(), timeout=stream_timeout) - req = endpoint.am_recv() + req = endpoint.stream_recv(peer_info_arr) await asyncio.wait_for(req.wait(), timeout=stream_timeout) - peer_info = req.recv_buffer else: - req = endpoint.am_recv() + req = endpoint.stream_recv(peer_info_arr) await asyncio.wait_for(req.wait(), timeout=stream_timeout) - peer_info = req.recv_buffer - req = endpoint.am_send(my_info_arr) + req = endpoint.stream_send(my_info_arr) await asyncio.wait_for(req.wait(), timeout=stream_timeout) # Unpacking and sanity check of the peer information