diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index e54e5d81..72a8d92f 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -40,6 +40,11 @@ jobs: cat out.log grep -Fxq "PASS" out.log + - name: Run TCP tests + working-directory: tests + run: | + make TCP=1 + fpga_sim: name: FPGA queue simulation runs-on: ubuntu-latest diff --git a/examples/tcp/test.py b/examples/tcp/test.py index b7d36556..f885df40 100755 --- a/examples/tcp/test.py +++ b/examples/tcp/test.py @@ -10,14 +10,14 @@ from switchboard import PySbPacket, PySbTx, PySbRx, start_tcp_bridge -def main(rxq='rx.q', txq='tx.q'): +def main(txq='tx.q', rxq='rx.q'): # create queues - tx = PySbTx(rxq, fresh=True) - rx = PySbRx(txq, fresh=True) + tx = PySbTx(txq, fresh=True) + rx = PySbRx(rxq, fresh=True) # start TCP bridges - start_tcp_bridge(mode='server', rx=rxq) - start_tcp_bridge(mode='client', tx=txq) + start_tcp_bridge(inputs=[txq]) + start_tcp_bridge(outputs=[('*', rxq)]) # form packet to be sent into the simulation. note that the arguments # to the constructor are all optional, and can all be specified later diff --git a/switchboard/sbtcp.py b/switchboard/sbtcp.py index 5d4e4bd5..b6b0ac8b 100755 --- a/switchboard/sbtcp.py +++ b/switchboard/sbtcp.py @@ -12,7 +12,6 @@ # https://stackoverflow.com/a/16745561 import time -import errno import socket import argparse import numpy as np @@ -21,155 +20,77 @@ SB_PACKET_SIZE_BYTES = 60 -def conn_closed(conn): - """ - Check if connection is closed by peeking into the read buffer. - """ - try: - buf = conn.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT) - if len(buf) == 0: - # Read of zero means connection was closed. - return True - except socket.error as e: - if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK: - # Some other error, re-raise since this was not expected. - raise e +def tcp2sb(outputs, conn): + while True: + # receive data from TCP + data_rx_from_tcp = bytes([]) - # Connection seems to be alive. - return False + while len(data_rx_from_tcp) < SB_PACKET_SIZE_BYTES: + b = conn.recv(SB_PACKET_SIZE_BYTES - len(data_rx_from_tcp)) + if len(b) == 0: + # connection is not alive anymore + return -def run_tcp_bridge(sbrx, sbtx, conn, should_yield=True): - """ - Sends packets received from PySbRx "sbrx" to TCP connection "conn", - and sends packets received from "conn" to PySbTx "sbtx". Runs - continuously until the connection breaks. - - The optional argument "should_yield" indicates if the bridge - should yield when there is no activity, or when congestion is - detected. The default is that yielding does occur, so that - the bridge doesn't use unnecessary CPU resources. However, - this will increase latency as compared to the case where - there is no explicit yielding. - """ + data_rx_from_tcp += b + + # convert to a switchboard packet + p = bytes2sb(data_rx_from_tcp) + + # figure out which queue this packet is going to + for rule, output in outputs: + if rule_matches(rule, p.destination): + output.send(p) + break + else: + raise Exception(f"No rule for destination {p.destination}") - # set connection to non-blocking, allowing SB and TCP - # send/recv operations to be fully interleaved - conn.setblocking(False) - # packets in progress of being sent/received +def sb2tcp(inputs, conn): tcp_data_to_send = bytes([]) - data_rx_from_tcp = bytes([]) - # continue bridging until the connection isn't alive anymore while True: - ############# - # SB -> TCP # - ############# - - sb2tcp_votes_to_yield = False - - if sbrx is not None: - # get a new packet if needed - if tcp_data_to_send == bytes([]): - p = sbrx.recv(blocking=False) - if p is not None: - # convert the packet to a bytes object - tcp_data_to_send = sb2bytes(p) - else: - # no data to pass on to TCP, so indicate - # that we may want to yield to other threads - sb2tcp_votes_to_yield = True - - # if there is a new packet to send along try to send a chunk of it - # in a non-blocking fashion - if tcp_data_to_send != bytes([]): - try: - n = conn.send(tcp_data_to_send) - except socket.error as e: - err = e.args[0] - if err == errno.EAGAIN or err == errno.EWOULDBLOCK: - # couldn't send anything over TCP, so indicate - # that we may want to yield to other threads - sb2tcp_votes_to_yield = True - else: - raise - else: - if n == 0: - # connection is not alive anymore - break - else: - tcp_data_to_send = tcp_data_to_send[n:] - if conn_closed(conn): + # get a switchboard packet + while True: + # select input and queue its next run as last + sbrx = inputs.pop(0) + inputs.append(sbrx) + + # try to receive a packet from this input + p = sbrx.recv(blocking=False) + + if p is not None: break - else: - # there is no channel for receiving SB packets - sb2tcp_votes_to_yield = True - - ############# - # TCP -> SB # - ############# - - tcp2sb_votes_to_yield = False - - # receive data from TCP and send it to a SB queue - if sbtx is not None: - # receive more data from TCP if needed - if len(data_rx_from_tcp) != SB_PACKET_SIZE_BYTES: - try: - b = conn.recv(SB_PACKET_SIZE_BYTES) - except socket.error as e: - err = e.args[0] - if err == errno.EAGAIN or err == errno.EWOULDBLOCK: - # couldn't receive anything over TCP, so indicate - # that we may want to yield to other threads - tcp2sb_votes_to_yield = True - else: - raise - else: - if len(b) == 0: - # connection is not alive anymore - break - else: - data_rx_from_tcp += b - - # try to send a Switchboard packet to a queue if we have one to send - if len(data_rx_from_tcp) == SB_PACKET_SIZE_BYTES: - if sbtx.send(bytes2sb(data_rx_from_tcp), blocking=False): - data_rx_from_tcp = bytes([]) - else: - # couldn't send a packet to a Switchboard queue, so indicate - # that we may want to yield to other threads - tcp2sb_votes_to_yield = True - else: - # there is no channel for transmitting SB packets - tcp2sb_votes_to_yield = True - ############ - # yielding # - ############ + # convert the switchboard packet to bytes + tcp_data_to_send = sb2bytes(p) + + # send the packet out over TCP + while len(tcp_data_to_send) > 0: + n = conn.send(tcp_data_to_send) - # yield if nothing is happening or it looks like we're blocked - # due to backpressure + if n == 0: + # connection is not alive anymore + return - if sb2tcp_votes_to_yield and tcp2sb_votes_to_yield and should_yield: - time.sleep(0) + tcp_data_to_send = tcp_data_to_send[n:] -def run_client(sbrx, sbtx, host, port, quiet=False, should_yield=True): +def run_client(inputs, host, port, quiet=False, max_rate=None): """ Connect to a server, retrying until a connection is made. """ # initialize TX/RX if needed - sbrx = convert_to_queue(sbrx, 'sbrx', PySbRx) - sbtx = convert_to_queue(sbtx, 'sbtx', PySbTx) + inputs = [convert_to_queue(q=input, cls=PySbRx, max_rate=max_rate) + for input in inputs] if not quiet: print('Waiting for server', end='', flush=True) while True: try: conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) conn.connect((host, port)) break except ConnectionRefusedError: @@ -181,17 +102,17 @@ def run_client(sbrx, sbtx, host, port, quiet=False, should_yield=True): print('Done.') # communicate with the server - run_tcp_bridge(sbrx=sbrx, sbtx=sbtx, conn=conn, should_yield=should_yield) + sb2tcp(inputs=inputs, conn=conn) -def run_server(sbrx, sbtx, host, port, quiet=False, should_yield=True, run_once=False): +def run_server(outputs, host, port=0, quiet=False, max_rate=None, run_once=False): """ Accepts client connections in a loop until Ctrl-C is pressed. """ - # initialize TX/RX if needed - sbrx = convert_to_queue(sbrx, 'sbrx', PySbRx) - sbtx = convert_to_queue(sbtx, 'sbtx', PySbTx) + # initialize TX objects if needed + outputs = [(rule, convert_to_queue(q=output, cls=PySbTx, max_rate=max_rate)) + for rule, output in outputs] # create the server socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -210,27 +131,10 @@ def run_server(sbrx, sbtx, host, port, quiet=False, should_yield=True, run_once= print('Done.') # communicate with that client - run_tcp_bridge(sbrx=sbrx, sbtx=sbtx, conn=conn, should_yield=should_yield) - if (run_once): - break - - -def main(): - # parse command-line arguments - - parser = get_parser() - args = parser.parse_args() + tcp2sb(outputs=outputs, conn=conn) - # main logic - - if args.mode == 'server': - run_server(sbrx=args.rx, sbtx=args.tx, host=args.host, port=args.port, - quiet=args.q, should_yield=(not args.noyield), run_once=args.run_once) - elif args.mode == 'client': - run_client(sbrx=args.rx, sbtx=args.tx, host=args.host, port=args.port, - quiet=args.q, should_yield=(not args.noyield)) - else: - raise ValueError(f"Invalid mode: {args.mode}") + if run_once: + break def sb2bytes(p): @@ -248,34 +152,73 @@ def bytes2sb(b): return PySbPacket(arr[0], arr[1], arr[2:].view(np.uint8)) -def convert_to_queue(q, name, cls): - if isinstance(q, cls) or (q is None): +def convert_to_queue(q, cls, max_rate=None): + if isinstance(q, cls): # note that None is passed through return q elif isinstance(q, str): - if q == "": - return None - else: - return cls(q) + # TODO: pass through max_rate + return cls(q) + else: + raise TypeError(f'{q} must be a string or {cls.__name__}; got {type(q)}') + + +def rule_matches(rule, addr): + if rule == '*': + return True + elif isinstance(rule, int): + return addr == rule + elif isinstance(rule, range): + return rule.start <= addr < rule.stop + elif isinstance(rule, (list, tuple)): + # return True if any subrules match + for subrule in rule: + if rule_matches(subrule, addr): + return True + + # otherwise return False + return False else: - raise TypeError(f'{name} must be a string or {cls.__name__}; got {type(q)}') + raise Exception(f'Unsupported rule type: {type(rule)}') + + +def parse_rule(rule): + subrules = rule.split(',') + + retval = [] + + for subrule in subrules: + if subrule == '*': + retval.append('*') + elif '-' in subrule: + start, stop = subrule.split('-') + start = int(start) + stop = int(stop) + retval.append(range(start, stop + 1)) + else: + retval.append(int(subrule)) + + return retval + +def start_tcp_bridge(inputs=None, outputs=None, host='localhost', port=5555, + quiet=True, max_rate=None): -def start_tcp_bridge(mode, tx=None, rx=None, host='localhost', port=5555, quiet=True): kwargs = dict( - sbrx=rx, - sbtx=tx, host=host, port=port, - quiet=quiet + quiet=quiet, + max_rate=max_rate ) - if mode == 'server': + if outputs is not None: + kwargs['outputs'] = outputs target = run_server - elif mode == 'client': + elif inputs is not None: + kwargs['inputs'] = inputs target = run_client else: - raise ValueError(f"Invalid mode: {mode}") + raise Exception('Must specify "outputs" or "inputs" argument.') import multiprocessing @@ -288,28 +231,49 @@ def start_tcp_bridge(mode, tx=None, rx=None, host='localhost', port=5555, quiet= def get_parser(): parser = argparse.ArgumentParser() - parser.add_argument('--rx', type=str, default="", help="URI of the Switchboard queue used in" - " the SB -> TCP direction. Optional.") - parser.add_argument('--tx', type=str, default="", help="URI of the Switchboard queue used in" - " the TCP -> SB direction. Optional.") - parser.add_argument('--mode', type=str, required=True, choices=["server", "client"], - help="Indicates if this program should act as a TCP server or client. In each pair" - " of TCP bridge programs, one must be a server and the other must be a client." - " The server will run forever, accepting a new client connection after" - " the previous client connection terminates. However, the client only runs") + parser.add_argument('--outputs', type=str, default=None, nargs='+', help="Space-separated" + " dictionary of queues to write to. For example, 0:a.q 1-2:b.q 3,5-7:c.q *:d.q means" + " that packets sent to destination 0 are routed to a.q, packets sent to destinations 1" + " or 2 are routed to b.q, packets sent to destinations 3, 5, 6, or 7 are routed to c.q," + " and all other packets are routed to d.q") + parser.add_argument('--inputs', type=str, default=None, nargs='+', help="Space-separated" + " list of queues to read from, for example a.q b.q c.q") parser.add_argument('--port', type=int, default=5555, help="TCP port used for" " sending and receiving packets.") parser.add_argument('--host', type=str, default="localhost", help="IP address or hostname" " used sending/receiving packets.") parser.add_argument('-q', action='store_true', help="Quiet mode: doesn't print anything.") - parser.add_argument('--noyield', action='store_true', help="Reduces latency by keeping the" - " CPU busy even when there is no packet activity, or when packets are blocked" - " due to backpressure.") - parser.add_argument('--run-once', action='store_true', - help="Process only one connection in server mode, then exit.") + parser.add_argument('--max-rate', type=float, default=None, help='Maximum rate at which' + ' queues are read or written.') + parser.add_argument('--run-once', action='store_true', help="Process only one connection" + " in server mode, then exit.") return parser +def main(): + # parse command-line arguments + + parser = get_parser() + args = parser.parse_args() + + # main logic + + if args.outputs is not None: + # parse the output mapping + outputs = [] + for output in args.outputs: + rule, output = output.split(':') + outputs.append((parse_rule(rule), output)) + + run_server(outputs=outputs, host=args.host, port=args.port, + quiet=args.q, max_rate=args.max_rate, run_once=args.run_once) + elif args.inputs is not None: + run_client(inputs=args.inputs, host=args.host, port=args.port, + quiet=args.q, max_rate=args.max_rate) + else: + raise ValueError("Must specify either --inputs or --outputs") + + if __name__ == "__main__": main() diff --git a/tests/Makefile b/tests/Makefile index bd465ffe..ec802adc 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -4,9 +4,15 @@ TOPDIR=.. include $(TOPDIR)/Rules.mk -MODE ?= queue +TESTS := hello bandwidth latency -test: torture +ifeq ($(TCP),1) + OPTIONS += --tcp +else + TESTS += torture +endif + +test: $(TESTS) TARGETS += hello.out TARGETS += bandwidth.out @@ -21,15 +27,15 @@ CPPFLAGS += -MMD .PHONY: hello hello: hello.out - ./test.py --test hello + ./test.py --test hello $(OPTIONS) .PHONY: bandwidth bandwidth: bandwidth.out - ./test.py --test bandwidth + ./test.py --test bandwidth $(OPTIONS) .PHONY: latency latency: latency.out - ./test.py --test latency + ./test.py --test latency $(OPTIONS) .PHONY: torture torture: torture.out diff --git a/tests/bandwidth.cc b/tests/bandwidth.cc index 6e02ecaa..addde9e0 100644 --- a/tests/bandwidth.cc +++ b/tests/bandwidth.cc @@ -23,12 +23,10 @@ int main(int argc, char* argv[]) { } } - const char* arg = "5555"; + const char* port = "queue-0"; if (arg_idx < argc) { - arg = argv[arg_idx++]; + port = argv[arg_idx++]; } - char port[128]; - sprintf(port, "queue-%s", arg); int iterations = 10000000; if (arg_idx < argc) { @@ -86,6 +84,11 @@ int main(int argc, char* argv[]) { // print output to make sure it is not optimized away printf("Output: %d\n", out); + // check output + if (out != iterations) { + throw std::runtime_error("MISMATCH"); + } + // stop measuring time taken std::chrono::steady_clock::time_point stop_time = std::chrono::steady_clock::now(); double t = @@ -94,7 +97,17 @@ int main(int argc, char* argv[]) { // print bandwidth double rate = (1.0 * iterations) / t; - printf("Rate: %0.3f MT/s\n", rate * 1e-6); + printf("Rate: "); + if (rate > 1e9) { + printf("%0.1f GT/s\n", rate * 1e-9); + } else if (rate > 1e6) { + printf("%0.1f MT/s\n", rate * 1e-6); + } else if (rate > 1e3) { + printf("%0.1f KT/s\n", rate * 1e-3); + } else { + printf("%0.1f T/s\n", rate); + } + printf("\n"); } return 0; diff --git a/tests/hello.cc b/tests/hello.cc index e8c8fc6d..23014008 100644 --- a/tests/hello.cc +++ b/tests/hello.cc @@ -2,6 +2,7 @@ // This code is licensed under Apache License 2.0 (see LICENSE for details) #include "switchboard.hpp" +#include #define NBYTES 32 @@ -29,12 +30,10 @@ int main(int argc, char* argv[]) { // determine the communication port to use - const char* arg = "5555"; + const char* port = "queue-0"; if (arg_idx < argc) { - arg = argv[arg_idx++]; + port = argv[arg_idx++]; } - char port[128]; - sprintf(port, "queue-%s", arg); if (is_tx) { SBTX tx; @@ -45,11 +44,7 @@ int main(int argc, char* argv[]) { p.destination = 0xbeefcafe; p.last = true; for (int i = 0; i < NBYTES; i++) { - p.data[i] = 0; - for (int j = 0; j < 2; j++) { - p.data[i] <<= 4; - p.data[i] |= (i + j) % 16; - } + p.data[i] = i; } // send packet @@ -64,6 +59,18 @@ int main(int argc, char* argv[]) { // print packet printf("%s\n", sb_packet_to_str(p, NBYTES).c_str()); + + // check destination + if (p.destination != 0xbeefcafe) { + throw std::runtime_error("MISMATCH"); + } + + // check data + for (int i = 0; i < NBYTES; i++) { + if (p.data[i] != i) { + throw std::runtime_error("MISMATCH"); + } + } } return 0; diff --git a/tests/latency.cc b/tests/latency.cc index 5ae82dd5..062fd849 100644 --- a/tests/latency.cc +++ b/tests/latency.cc @@ -21,20 +21,16 @@ int main(int argc, char* argv[]) { } // determine the RX port - const char* rx_arg = "5555"; + const char* rx_port = "queue-0"; if (arg_idx < argc) { - rx_arg = argv[arg_idx++]; + rx_port = argv[arg_idx++]; } - char rx_port[128]; - sprintf(rx_port, "queue-%s", rx_arg); // determine the TX port - const char* tx_arg = "5555"; + const char* tx_port = "queue-1"; if (arg_idx < argc) { - tx_arg = argv[arg_idx++]; + tx_port = argv[arg_idx++]; } - char tx_port[128]; - sprintf(tx_port, "queue-%s", tx_arg); int iterations = 10000000; if (arg_idx < argc) { @@ -73,10 +69,17 @@ int main(int argc, char* argv[]) { // print output to make sure it is not optimized away printf("Output: {"); for (int i = 0; i < 8; i++) { - printf("%0d", *((uint32_t*)(&p.data[4 * i]))); + // print next entry + uint32_t out = *((uint32_t*)(&p.data[4 * i])); + printf("%0d", out); if (i != 7) { printf(", "); } + + // check entry + if (out != (2 * iterations)) { + throw std::runtime_error("MISMATCH"); + } } printf("}\n"); @@ -86,8 +89,18 @@ int main(int argc, char* argv[]) { 1.0e-6 * (std::chrono::duration_cast(stop_time - start_time).count()); + printf("Latency: "); double latency = t / (1.0 * iterations); - printf("Latency: %0.1f ns\n", latency * 1.0e9); + if (latency < 1e-6) { + printf("%0.1f ns", latency * 1.0e9); + } else if (latency < 1e-3) { + printf("%0.1f us", latency * 1.0e6); + } else if (latency < 1) { + printf("%0.1f ms", latency * 1.0e3); + } else { + printf("%0.1f s", latency); + } + printf("\n"); } else { while (count < iterations) { // busy-loop for minimum latency diff --git a/tests/test.py b/tests/test.py index 77ed4ac5..bc816bc8 100755 --- a/tests/test.py +++ b/tests/test.py @@ -3,98 +3,93 @@ # Copyright (c) 2024 Zero ASIC Corporation # This code is licensed under Apache License 2.0 (see LICENSE for details) -import os +import time import atexit -import subprocess import argparse import itertools from pathlib import Path +from switchboard import binary_run, delete_queue, start_tcp_bridge THIS_DIR = Path(__file__).resolve().parent TOP_DIR = THIS_DIR.parent -SHMEM_DIR = THIS_DIR + +QUEUE_COUNTER = itertools.count(start=0) + + +def next_queue(): + queue = str(THIS_DIR / f'queue-{next(QUEUE_COUNTER)}') + delete_queue(queue) + atexit.register(lambda queue=queue: delete_queue(queue)) + return queue def main(): parser = argparse.ArgumentParser() parser.add_argument('--verbose', action='store_true') - parser.add_argument('--test', type=str, default='hello') + parser.add_argument('--test', type=str, default='hello', + choices=['hello', 'bandwidth', 'latency']) parser.add_argument('--iterations', type=int, default=None) + parser.add_argument('--tcp', action='store_true') args = parser.parse_args() # set defaults if args.iterations is None: if args.test == 'bandwidth': - args.iterations = 50000000 + args.iterations = 50000000 if not args.tcp else 40000 elif args.test == 'latency': - args.iterations = 5000000 - - # provide unique queue numbers - queue_counter = itertools.count(start=5555) - - # rx queue numbers - rx = [None, None] + args.iterations = 5000000 if not args.tcp else 5000 + # determine the number of paths if args.test in {'hello', 'bandwidth'}: - rx[1] = next(queue_counter) + num_paths = 1 elif args.test in {'latency'}: - rx[0] = next(queue_counter) - rx[1] = next(queue_counter) + num_paths = 2 else: raise Exception(f'Unknown test: {args.test}') - # split ports - tx = [None, None] - tx[0] = rx[1] - tx[1] = rx[0] + # create the paths + paths = [] + port = 5555 + for _ in range(num_paths): + a = next_queue() + + if args.tcp: + b = next_queue() + + print(f'Starting TCP bridge at port {port}... ', end='', flush=True) + + start_tcp_bridge(inputs=[a], port=port) + start_tcp_bridge(outputs=[('*', b)], port=port) - # clean up old queues if present - for port in set(rx + tx): - if port is not None: - filename = str(SHMEM_DIR / f'queue-{port}') - try: - os.remove(filename) - except OSError: - pass + time.sleep(2) + print('done') + + port += 1 + else: + b = a + + paths.append((a, b)) # run the specific test of interest if args.test == 'hello': hello = THIS_DIR / 'hello.out' - p = run_cmd(hello, 'rx', rx[1], auto_exit=False, verbose=args.verbose) - run_cmd(hello, 'tx', tx[0], verbose=args.verbose) - p.wait() + p = binary_run(hello, ['rx', paths[0][1]]) + binary_run(hello, ['tx', paths[0][0]]) + exit(p.wait()) elif args.test == 'bandwidth': bandwidth = THIS_DIR / 'bandwidth.out' - p = run_cmd(bandwidth, 'rx', rx[1], args.iterations, auto_exit=False, verbose=args.verbose) - run_cmd(bandwidth, 'tx', tx[0], args.iterations, verbose=args.verbose) - p.wait() + p = binary_run(bandwidth, ['rx', paths[0][1], args.iterations]) + binary_run(bandwidth, ['tx', paths[0][0], args.iterations]) + exit(p.wait()) elif args.test == 'latency': latency = THIS_DIR / 'latency.out' - run_cmd(latency, 'second', rx[0], tx[0], args.iterations, verbose=args.verbose) - p = run_cmd(latency, 'first', rx[1], tx[1], args.iterations, - auto_exit=False, verbose=args.verbose) - p.wait() + binary_run(latency, ['second', paths[0][1], paths[1][0], args.iterations]) + p = binary_run(latency, ['first', paths[1][1], paths[0][0], args.iterations]) + exit(p.wait()) else: raise Exception(f'Unknown test: {args.test}') -def run_cmd(path, *args, auto_exit=True, verbose=False): - cmd = [] - cmd += [path] - cmd += args - cmd = [str(elem) for elem in cmd] - - if verbose: - print(' '.join(cmd)) - - p = subprocess.Popen(cmd) - - if auto_exit: - atexit.register(p.terminate) - - return p - - if __name__ == '__main__': main()