diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 00000000..44c89746 --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: 15bcfc5e0d3550994f780fe622ea1282 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.doctrees/environment.pickle b/.doctrees/environment.pickle new file mode 100644 index 00000000..fb7ec9e7 Binary files /dev/null and b/.doctrees/environment.pickle differ diff --git a/.doctrees/index.doctree b/.doctrees/index.doctree new file mode 100644 index 00000000..765ae10e Binary files /dev/null and b/.doctrees/index.doctree differ diff --git a/.doctrees/switchboard.bitvector.doctree b/.doctrees/switchboard.bitvector.doctree new file mode 100644 index 00000000..ccbcb839 Binary files /dev/null and b/.doctrees/switchboard.bitvector.doctree differ diff --git a/.doctrees/switchboard.doctree b/.doctrees/switchboard.doctree new file mode 100644 index 00000000..63a66068 Binary files /dev/null and b/.doctrees/switchboard.doctree differ diff --git a/.doctrees/switchboard.icarus.doctree b/.doctrees/switchboard.icarus.doctree new file mode 100644 index 00000000..b47a353f Binary files /dev/null and b/.doctrees/switchboard.icarus.doctree differ diff --git a/.doctrees/switchboard.loopback.doctree b/.doctrees/switchboard.loopback.doctree new file mode 100644 index 00000000..91f772c0 Binary files /dev/null and b/.doctrees/switchboard.loopback.doctree differ diff --git a/.doctrees/switchboard.sbdut.doctree b/.doctrees/switchboard.sbdut.doctree new file mode 100644 index 00000000..0d836c43 Binary files /dev/null and b/.doctrees/switchboard.sbdut.doctree differ diff --git a/.doctrees/switchboard.sbtcp.doctree b/.doctrees/switchboard.sbtcp.doctree new file mode 100644 index 00000000..95a07ec4 Binary files /dev/null and b/.doctrees/switchboard.sbtcp.doctree differ diff --git a/.doctrees/switchboard.uart_xactor.doctree b/.doctrees/switchboard.uart_xactor.doctree new file mode 100644 index 00000000..0926bfc1 Binary files /dev/null and b/.doctrees/switchboard.uart_xactor.doctree differ diff --git a/.doctrees/switchboard.umi.doctree b/.doctrees/switchboard.umi.doctree new file mode 100644 index 00000000..d9960bb1 Binary files /dev/null and b/.doctrees/switchboard.umi.doctree differ diff --git a/.doctrees/switchboard.util.doctree b/.doctrees/switchboard.util.doctree new file mode 100644 index 00000000..4d49fc5e Binary files /dev/null and b/.doctrees/switchboard.util.doctree differ diff --git a/.doctrees/switchboard.verilator.doctree b/.doctrees/switchboard.verilator.doctree new file mode 100644 index 00000000..2d0a0c49 Binary files /dev/null and b/.doctrees/switchboard.verilator.doctree differ diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 00000000..e69de29b diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 00000000..eba14d1f --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,112 @@ + + +
+ + +
+# Utilities for working with bit vectors using Verilog-style syntax, i.e. [MSB:LSB]
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+import numpy as np
+
+
+
+[docs]
+class BitVector:
+ def __init__(self, value: int = 0):
+ self.value = int(value)
+
+ def __int__(self):
+ return int(self.value)
+
+ def __str__(self):
+ return f'0x{self.value:x}'
+
+ def __setitem__(self, key, value):
+ if isinstance(key, slice):
+ if (key.start is None) and (key.stop is None) and (key.step is None):
+ self.value = value
+ return
+ else:
+ msb, lsb = slice_to_msb_lsb(key.start, key.stop, key.step)
+ else:
+ msb, lsb = slice_to_msb_lsb(key, key)
+
+ # generate mask with the right width
+ mask = (1 << (msb - lsb + 1)) - 1
+
+ # clear bit field using the mask
+ new_value = self.value & (~(mask << lsb))
+
+ # set bit field
+ new_value |= (value & mask) << lsb
+
+ # set the new value (done here instead of through
+ # incremental updates, to prevent the value from
+ # being partially updated in case of an exception
+ self.value = new_value
+
+ def __getitem__(self, key):
+ if isinstance(key, slice):
+ if (key.start is None) and (key.stop is None) and (key.step is None):
+ return self.value
+ else:
+ msb, lsb = slice_to_msb_lsb(key.start, key.stop, key.step)
+ else:
+ msb, lsb = slice_to_msb_lsb(key, key)
+
+ # generate mask with the right width
+ mask = (1 << (msb - lsb + 1)) - 1
+
+ # extract the value
+ return (self.value >> lsb) & mask
+
+
+[docs]
+ def tobytes(self, n=None):
+ # convert to a numpy byte array. if "n" is provided,
+ # pad result to be "n" bytes. will error out if "n"
+ # is less than the number of bytes needed to represent
+ # the current value.
+
+ value = self.value
+ bytes = []
+
+ while value != 0:
+ bytes.append(value & 0xff)
+ value >>= 8
+
+ if n is not None:
+ if len(bytes) < n:
+ bytes += [0] * (n - len(bytes))
+ elif len(bytes) > n:
+ raise ValueError('Number of bytes needed to represent the current value'
+ f' ({self.value}) is {len(bytes)}, but the argument n={n} is smaller.')
+
+ return np.array(bytes, dtype=np.uint8)
+
+
+
+[docs]
+ @staticmethod
+ def frombytes(arr):
+ value = 0
+
+ for i, elem in enumerate(arr):
+ if not (0 <= elem <= 255):
+ raise ValueError(f'Non-byte value detected at index {i}: {elem}')
+ value |= (int(elem) & 0xff) << (i * 8)
+
+ return BitVector(value)
+
+
+
+
+
+[docs]
+def slice_to_msb_lsb(start=None, stop=None, step=None):
+ # set defaults
+ if start is None:
+ start = 0
+ if stop is None:
+ stop = 0
+ if step is None:
+ step = 1
+
+ if step != 1:
+ raise ValueError('Only step=1 allowed for slice indexing.')
+
+ msb = start
+ lsb = stop
+
+ if msb < lsb:
+ raise ValueError('MSB must be greater than or equal to LSB')
+ if lsb < 0:
+ raise ValueError('Negative LSB is not allowed.')
+
+ return msb, lsb
+
+
+# Utilities for working with Icarus Verilog
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+# TODO: replace with SiliconCompiler functionality
+
+from typing import Union, List
+from pathlib import Path
+
+from .util import plusargs_to_args, binary_run
+from .switchboard import path as sb_path
+from subprocess import check_output, STDOUT, CalledProcessError
+
+
+
+[docs]
+def run(command: list, cwd: str = None) -> str:
+ return check_output(command, cwd=cwd, stderr=STDOUT).decode()
+
+
+
+
+[docs]
+def icarus_build_vpi(
+ cwd: str = None,
+ name: str = 'switchboard',
+ cincludes: List[str] = None,
+ ldflags: List[str] = None
+) -> str:
+ if cincludes is None:
+ cincludes = []
+
+ if ldflags is None:
+ ldflags = []
+
+ sbdir = sb_path()
+ incdirs = cincludes + [f'{sbdir}/cpp']
+
+ cmd = []
+ cmd += ['iverilog-vpi']
+ cmd += [f'-I{incdir}' for incdir in incdirs]
+ cmd += ldflags
+ cmd += [str(sbdir / 'vpi' / f'{name}_vpi.cc')]
+
+ try:
+ run(cmd, cwd)
+ except CalledProcessError as e:
+ print(e.output)
+ raise
+
+
+
+
+[docs]
+def icarus_find_vpi(cwd: Union[str, Path] = None, name: str = 'switchboard') -> Path:
+ path = Path(f'{name}_vpi.vpi')
+
+ if cwd is not None:
+ path = Path(cwd) / path
+
+ if path.exists():
+ return path
+ else:
+ return None
+
+
+
+
+[docs]
+def icarus_run(vvp, plusargs=None, modules=None, extra_args=None, **kwargs):
+ args = []
+
+ args += ['-n']
+
+ mdirs = set()
+
+ if modules is not None:
+ if not isinstance(modules, list):
+ raise TypeError('modules must be a list')
+ for module in modules:
+ mdirs.add(str(Path(module.resolve().parent)))
+ args += ['-m', Path(module).stem]
+
+ for mdir in mdirs:
+ args += [f'-M{mdir}']
+
+ args += [vvp]
+ args += plusargs_to_args(plusargs)
+
+ if extra_args is not None:
+ if not isinstance(modules, list):
+ raise TypeError('extra_args must be a list')
+ args += extra_args
+
+ return binary_run(bin='vvp', args=args, **kwargs)
+
+
+# Loopback test to check the behavior of blocks that split/merge UMI packets
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+from numbers import Integral
+from typing import Iterable, Iterator, Union
+
+try:
+ from tqdm import tqdm
+except ModuleNotFoundError:
+ tqdm = None
+
+from .umi import UmiTxRx, random_umi_packet
+
+
+
+[docs]
+def umi_loopback(
+ umi: UmiTxRx,
+ packets: Union[Integral, Iterable, Iterator] = 10,
+ **kwargs
+):
+ """
+ Performs a loopback test by sending packets into a block and checking that
+ the packets received back are equivalent under the UMI split/merge rules.
+
+ Parameters
+ ----------
+ umi: UmiTxRx
+ packets:
+ Can be a number, a list of packets, or a generator.
+
+ - If this is a number, it represents the number of packets to send,
+ which are generated with random_umi_packet. Any remaining arguments
+ are passed directly to random_umi_packet.
+ - If this is an iterable (list, tuple, etc.), then it represents a list
+ of packets to use for the test. This is helpful if you want to use a very
+ specific sequence of transactions.
+ - This can also be an iterator, which might be convenient if you want to
+ send a very large number of packets without having to store them
+ all in memory at once, e.g. (random_umi_packet() for _ in range(1000000))
+
+ Raises
+ ------
+ ValueError
+ If the number of packets is not positive or if the `packets` argument is empty
+ Exception
+ If a received packet does not match the corresponding transmitted packet
+
+ """
+
+ # input validation
+
+ if isinstance(packets, Integral):
+ if packets <= 0:
+ raise ValueError(f'The number of packets must be positive (got packets={packets}).')
+ else:
+ total = packets
+ packets = (random_umi_packet(**kwargs) for _ in range(packets))
+ elif isinstance(packets, Iterable):
+ if isinstance(packets, (list, tuple)):
+ total = len(packets)
+ else:
+ total = float('inf')
+ packets = iter(packets)
+ elif isinstance(packets, Iterator):
+ total = float('inf')
+ else:
+ raise TypeError(f'Unsupported type for packets: {type(packets)}')
+
+ tx_sets = [] # kept for debug purposes
+ tx_hist = []
+
+ tx_set = None
+ tx_partial = None
+
+ rx_set = None # kept for debug purposes
+ rx_partial = None
+
+ if tqdm is not None:
+ pbar = tqdm(total=total)
+ else:
+ pbar = None
+
+ # get the first element
+ try:
+ txp = next(packets)
+ if pbar is not None:
+ pbar.update(0)
+ except StopIteration:
+ raise ValueError('The argument "packets" is empty.')
+
+ while (txp is not None) or (len(tx_hist) > 0):
+ # send data
+ if txp is not None:
+ if umi.send(txp, blocking=False):
+ if tx_partial is not None:
+ if not tx_partial.merge(txp):
+ tx_hist.append(tx_partial)
+ tx_sets.append(tx_set)
+ tx_start_new = True
+ else:
+ tx_set.append(txp)
+ tx_start_new = False
+ else:
+ tx_start_new = True
+
+ if tx_start_new:
+ tx_partial = txp
+ tx_set = [txp]
+
+ try:
+ txp = next(packets)
+ except StopIteration:
+ txp = None
+
+ if txp is None:
+ # if this is the last packet, add it to the history
+ # even if the merge was successful
+ tx_hist.append(tx_partial)
+ tx_sets.append(tx_set)
+
+ # receive data
+ if len(tx_hist) > 0:
+ rxp = umi.recv(blocking=False)
+ if rxp is not None:
+ # try to merge into an existing partial packet
+ if rx_partial is not None:
+ if not rx_partial.merge(rxp):
+ print('=== Mismatch detected ===')
+ for i, p in enumerate(tx_sets[0]):
+ print(f'* TX[{i}] *')
+ print(p)
+ print('---')
+ for i, p in enumerate(rx_set):
+ print(f'* RX[{i}] *')
+ print(p)
+ print('=========================')
+ raise Exception('Mismatch!')
+ else:
+ rx_set.append(rxp)
+ else:
+ rx_partial = rxp
+ rx_set = [rxp]
+
+ # at this point it is guaranteed there is something in
+ # rx_partial, so compare it to the expected outbound packet
+ if rx_partial == tx_hist[0]:
+ tx_hist.pop(0)
+ tx_sets.pop(0)
+ rx_partial = None
+ rx_set = None
+
+ if pbar is not None:
+ pbar.update()
+
+ if pbar is not None:
+ pbar.close()
+
+
+# Build and simulation automation built on SiliconCompiler
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+"""Class inheriting from the SiliconCompiler Chip class that can be used for building a
+Switchboard-based testbench.
+
+This class is meant to be interacted with like a regular Chip object, but it has some parameters
+automatically configured to abstract away setup of files that are required by all Switchboard
+testbenches.
+"""
+
+import importlib
+import subprocess
+
+from copy import deepcopy
+from pathlib import Path
+from typing import List, Dict, Any
+
+from .switchboard import path as sb_path
+from .verilator import verilator_run
+from .icarus import icarus_build_vpi, icarus_find_vpi, icarus_run
+from .util import plusargs_to_args, binary_run, ProcessCollection
+from .xyce import xyce_flags
+from .ams import make_ams_spice_wrapper, make_ams_verilog_wrapper, parse_spice_subckts
+from .autowrap import (normalize_clocks, normalize_interfaces, normalize_resets, normalize_tieoffs,
+ normalize_parameters, create_intf_objs)
+from .cmdline import get_cmdline_args
+
+import siliconcompiler
+
+SB_DIR = sb_path()
+
+
+
+[docs]
+class SbDut(siliconcompiler.Chip):
+ def __init__(
+ self,
+ design: str = 'testbench',
+ tool: str = 'verilator',
+ default_main: bool = True,
+ trace: bool = True,
+ trace_type: str = 'vcd',
+ module: str = None,
+ fpga: bool = False,
+ xyce: bool = False,
+ frequency: float = 100e6,
+ period: float = None,
+ max_rate: float = -1,
+ start_delay: float = None,
+ timeunit: str = None,
+ timeprecision: str = None,
+ warnings: List[str] = None,
+ cmdline: bool = False,
+ fast: bool = False,
+ extra_args: dict = None,
+ autowrap: bool = False,
+ parameters=None,
+ interfaces=None,
+ clocks=None,
+ resets=None,
+ tieoffs=None,
+ buildroot=None,
+ builddir=None,
+ args=None,
+ subcomponent=False,
+ suffix=None,
+ threads=None
+ ):
+ """
+ Parameters
+ ----------
+ design: string
+ Name of the top level chip design module.
+
+ tool: string, optional
+ Which tool to use to compile simulator. Options are "verilator" or
+ "icarus".
+
+ default_main: bool, optional
+ If True, the default testbench.cc will be used and does not need to
+ be provided via the add() function
+
+ trace: bool, optional
+ If true, a waveform dump file will be produced using the file type
+ specified by `trace_type`.
+
+ trace_type: str, optional
+ File type for the waveform dump file. Defaults to vcd.
+
+ module: str, optional
+ module containing the siliconcompiler driver for this object
+
+ fpga: bool, optional
+ If True, compile using switchboard's library of modules for FPGA emulation,
+ rather than the modules for RTL simulation.
+
+ xyce: bool, optional
+ If True, compile for xyce co-simulation.
+
+ frequency: float, optional
+ If provided, the default frequency of the clock generated in the testbench,
+ in seconds.
+
+ period: float, optional
+ If provided, the default period of the clock generated in the testbench,
+ in seconds.
+
+ max_rate: float, optional
+ If provided, the maximum real-world rate that the simulation is allowed to run
+ at, in Hz. Can be useful to encourage time-sharing between many processes and
+ for performance modeling when latencies are large and/or variable.
+
+ start_delay: float, optional
+ If provided, the real-world time to delay before the first clock tick in the
+ simulation. Can be useful to make sure that programs start at approximately
+ the same time and to prevent simulations from stepping on each other's toes
+ when starting up.
+
+ warnings: List[str], optional
+ If provided, a list of tool-specific warnings to enable. If not provided, a default
+ set of warnings will be included. Warnings can be disabled by setting this argument
+ to an empty list.
+
+ cmdline: bool, optional
+ If True, accept configuration settings from the command line, such as "--trace",
+ "--tool TOOL", and "--fast".
+
+ fast: bool, optional
+ If True, the simulation binary will not be rebuilt if an existing one is found.
+ The setting here can be overridden when build() is called by setting its argument
+ with the same name.
+
+ extra_args: dict, optional
+ If provided and cmdline=True, a dictionary of additional command line arguments
+ to be made available. The keys of the dictionary are the arguments ("-n", "--test",
+ etc.) and the values are themselves dictionaries that contain keyword arguments
+ accepted by argparse ("action": "store_true", "default": 42, etc.)
+ """
+
+ # call the super constructor
+
+ if autowrap and (not subcomponent):
+ toplevel = 'testbench'
+ else:
+ toplevel = design
+
+ super().__init__(toplevel)
+
+ # parse command-line options if desired
+
+ if cmdline:
+ self.args = get_cmdline_args(tool=tool, trace=trace, trace_type=trace_type,
+ frequency=frequency, period=period, fast=fast, max_rate=max_rate,
+ start_delay=start_delay, threads=threads, extra_args=extra_args)
+ elif args is not None:
+ self.args = args
+ else:
+ self.args = None
+
+ if self.args is not None:
+ trace = self.args.trace
+ trace_type = self.args.trace_type
+ fast = self.args.fast
+ tool = self.args.tool
+ frequency = self.args.frequency
+ period = self.args.period
+ max_rate = self.args.max_rate
+ start_delay = self.args.start_delay
+ threads = self.args.threads
+
+ # input validation
+
+ if trace_type not in ('vcd', 'fst'):
+ raise ValueError('Invalid trace_type, expected one of "vcd" or "fst"')
+
+ # save settings
+
+ self.tool = tool
+ self.trace = trace
+ self.trace_type = trace_type
+ self.fpga = fpga
+ self.xyce = False # is set True by _configure_xyce
+ self.warnings = warnings
+ self.fast = fast
+
+ if (period is None) and (frequency is not None):
+ period = 1 / frequency
+ self.period = period
+ self.max_rate = max_rate
+ self.start_delay = start_delay
+
+ self.threads = threads
+
+ self.timeunit = timeunit
+ self.timeprecision = timeprecision
+
+ self.autowrap = autowrap
+
+ if (suffix is None) and subcomponent:
+ suffix = f'_unq_{design}'
+
+ self.suffix = suffix
+
+ if suffix is not None:
+ self.dut = f'{design}{suffix}'
+ else:
+ self.dut = design
+
+ self.parameters = normalize_parameters(parameters)
+ self.intf_defs = normalize_interfaces(interfaces)
+ self.clocks = normalize_clocks(clocks)
+ self.resets = normalize_resets(resets)
+ self.tieoffs = normalize_tieoffs(tieoffs)
+
+ # initialization
+
+ self.intfs = {}
+
+ # keep track of processes started
+ self.process_collection = ProcessCollection()
+
+ # simulator-agnostic settings
+
+ if builddir is None:
+ if buildroot is None:
+ buildroot = 'build'
+
+ buildroot = Path(buildroot).resolve()
+
+ if subcomponent:
+ # the subcomponent build flow is tool-agnostic, producing a single Verilog
+ # file as output, as opposed to a simulator binary
+ builddir = buildroot / metadata_str(design=design, parameters=parameters)
+ else:
+ builddir = buildroot / metadata_str(design=design, parameters=parameters,
+ tool=tool, trace=trace, trace_type=trace_type, threads=threads)
+
+ self.set('option', 'builddir', str(Path(builddir).resolve()))
+
+ self.set('option', 'clean', True) # preserve old behavior
+
+ if not subcomponent:
+ if fpga:
+ # library dirs
+ self.set('option', 'ydir', sb_path() / 'verilog' / 'fpga')
+ self.add('option', 'ydir', sb_path() / 'deps' / 'verilog-axi' / 'rtl')
+
+ # include dirs
+ self.set('option', 'idir', sb_path() / 'verilog' / 'fpga' / 'include')
+
+ for opt in ['ydir', 'idir']:
+ if not fpga:
+ self.set('option', opt, sb_path() / 'verilog' / 'sim')
+ self.add('option', opt, sb_path() / 'verilog' / 'common')
+
+ if trace:
+ self.set('tool', 'verilator', 'task', 'compile', 'var', 'trace', True)
+ self.add('option', 'define', 'SB_TRACE')
+
+ if self.trace_type == 'fst':
+ self.add('option', 'define', 'SB_TRACE_FST')
+
+ if tool == 'icarus':
+ self._configure_icarus()
+ else:
+ if module is None:
+ if tool == 'verilator':
+ module = 'siliconcompiler'
+ else:
+ raise ValueError('Must specify the "module" argument,'
+ ' which is the name of the module containing the'
+ ' SiliconCompiler driver for this simulator.')
+
+ self._configure_build(
+ module=module,
+ default_main=default_main,
+ fpga=fpga
+ )
+
+ if xyce:
+ self._configure_xyce()
+ else:
+ # special mode that produces a standalone Verilog netlist
+ # rather than building/running a simulation
+
+ flowname = 'package'
+
+ self.package_flow = siliconcompiler.Flow(flowname)
+
+ from siliconcompiler.tools.surelog import parse
+ self.package_flow.node(flowname, 'parse', parse)
+
+ from .sc.sed import remove
+ self.package_flow.node(flowname, 'remove', remove)
+
+ from .sc.morty import uniquify
+ self.package_flow.node(flowname, 'uniquify', uniquify)
+
+ self.package_flow.edge(flowname, 'parse', 'remove')
+ self.package_flow.edge(flowname, 'remove', 'uniquify')
+
+ self.use(self.package_flow)
+ self.set('option', 'flow', flowname)
+
+ def _configure_build(
+ self,
+ module: str,
+ default_main: bool = False,
+ fpga: bool = False
+ ):
+ if not fpga:
+ self.input(SB_DIR / 'dpi' / 'switchboard_dpi.cc')
+
+ if default_main and (self.tool == 'verilator'):
+ self.input(SB_DIR / 'verilator' / 'testbench.cc')
+
+ if fpga and (self.tool == 'verilator'):
+ self.set('tool', 'verilator', 'task', 'compile', 'file', 'config',
+ sb_path() / 'verilator' / 'config.vlt')
+ self.set('tool', 'verilator', 'task', 'compile', 'warningoff', 'TIMESCALEMOD')
+
+ # enable specific warnings that aren't included by default
+ if self.tool == 'verilator':
+ if self.warnings is None:
+ warnings = ['BLKSEQ']
+ else:
+ warnings = self.warnings
+
+ for warning in warnings:
+ self.set('tool', 'verilator', 'task', 'compile', 'option', f'-Wwarn-{warning}')
+
+ self.set('tool', self.tool, 'task', 'compile', 'var', 'cflags',
+ ['-Wno-unknown-warning-option'])
+ self.set('tool', self.tool, 'task', 'compile', 'dir', 'cincludes', [SB_DIR / 'cpp'])
+ self.set('tool', self.tool, 'task', 'compile', 'var', 'ldflags', ['-pthread'])
+
+ if self.trace and (self.tool == 'verilator'):
+ self.set('tool', 'verilator', 'task', 'compile', 'var', 'trace_type', self.trace_type)
+
+ if self.tool == 'verilator':
+ timeunit = self.timeunit
+ timeprecision = self.timeprecision
+
+ if (timeunit is not None) or (timeprecision is not None):
+ if timeunit is None:
+ timeunit = '1ps' # default from Verilator documentation
+
+ if timeprecision is None:
+ timeprecision = '1ps' # default from Verilator documentation
+
+ timescale = f'{timeunit}/{timeprecision}'
+ self.add('tool', 'verilator', 'task', 'compile', 'option', '--timescale')
+ self.add('tool', 'verilator', 'task', 'compile', 'option', timescale)
+
+ if (self.threads is not None) and (self.tool == 'verilator'):
+ self.add('tool', 'verilator', 'task', 'compile', 'option', '--threads')
+ self.add('tool', 'verilator', 'task', 'compile', 'option', str(self.threads))
+
+ self.set('option', 'libext', ['v', 'sv'])
+
+ # Set up flow that compiles RTL
+ # TODO: this will be built into SC
+ self.set('option', 'flow', 'simflow')
+
+ compile = importlib.import_module(f'{module}.tools.{self.tool}.compile')
+ self.node('simflow', 'compile', compile)
+
+ def _configure_icarus(self):
+ self.add('option', 'libext', 'sv')
+ self.set('tool', 'icarus', 'task', 'compile', 'var', 'verilog_generation', '2012')
+
+ # use dvflow to execute Icarus, but set steplist so we don't run sim
+ from siliconcompiler.flows import dvflow
+ self.use(dvflow)
+
+ self.set('option', 'flow', 'dvflow')
+ self.set('option', 'to', 'compile')
+
+ def _configure_xyce(self):
+ if self.xyce:
+ # already configured, so return early
+ return
+
+ self.add('option', 'define', 'SB_XYCE')
+
+ if self.tool != 'icarus':
+ self.input(SB_DIR / 'dpi' / 'xyce_dpi.cc')
+
+ xyce_c_includes, xyce_ld_flags = xyce_flags()
+
+ self.add('tool', self.tool, 'task', 'compile', 'dir', 'cincludes', xyce_c_includes)
+ self.add('tool', self.tool, 'task', 'compile', 'var', 'ldflags', xyce_ld_flags)
+
+ # indicate that build is configured for Xyce. for Icarus simulation, this flag is used
+ # to determine whether a VPI object should be built for Xyce
+ self.xyce = True
+
+
+[docs]
+ def find_sim(self):
+ if self.tool == 'icarus':
+ result_kind = 'vvp'
+ else:
+ result_kind = 'vexe'
+
+ return self.find_result(result_kind, step='compile')
+
+
+
+[docs]
+ def build(self, cwd: str = None, fast: bool = None):
+ """
+ Parameters
+ ---------
+ cwd: str, optional
+ Working directory for the simulation build
+
+ fast: bool, optional
+ If True, the simulation binary will not be rebuilt if an existing one
+ is found. Defaults to the value provided to the SbDut constructor,
+ which in turn defaults to False.
+ """
+
+ if fast is None:
+ fast = self.fast
+
+ if self.tool == 'icarus':
+ if (not fast) or (icarus_find_vpi(cwd, name='switchboard') is None):
+ icarus_build_vpi(cwd, name='switchboard')
+ if self.xyce and ((not fast) or (icarus_find_vpi(cwd, name='xyce') is None)):
+ cincludes, ldflags = xyce_flags()
+ icarus_build_vpi(cwd, name='xyce', cincludes=cincludes, ldflags=ldflags)
+
+ # if "fast" is set, then we can return early if the
+ # simulation binary already exists
+ if fast:
+ sim = self.find_sim()
+ if sim is not None:
+ return sim
+
+ # build the wrapper if needed
+ if self.autowrap:
+ from .autowrap import autowrap
+
+ filename = Path(self.get('option', 'builddir')).resolve() / 'testbench.sv'
+
+ filename.parent.mkdir(exist_ok=True, parents=True)
+
+ instance = f'{self.dut}_i'
+
+ autowrap(
+ instances={instance: self.dut},
+ parameters={instance: self.parameters},
+ interfaces={instance: self.intf_defs},
+ clocks={instance: self.clocks},
+ resets={instance: self.resets},
+ tieoffs={instance: self.tieoffs},
+ filename=filename
+ )
+
+ self.input(filename)
+
+ # if we get to this point, then we need to rebuild
+ # the simulation binary
+ self.run()
+
+ return self.find_sim()
+
+
+
+[docs]
+ def simulate(
+ self,
+ plusargs=None,
+ args=None,
+ extra_args=None,
+ cwd: str = None,
+ trace: bool = None,
+ period: float = None,
+ frequency: float = None,
+ max_rate: float = None,
+ start_delay: float = None,
+ run: str = None,
+ intf_objs: bool = True
+ ) -> subprocess.Popen:
+ """
+ Parameters
+ ----------
+ plusargs: str or list or tuple, optional
+ additional arguments to pass to simulator that must be preceded
+ with a +. These are listed after `args`.
+
+ args: str or list or tuple, optional
+ additional arguments to pass to simulator listed before `plusargs` and
+ `extra_args`
+
+ extra_args: str or list or tuple, optional
+ additional arguments to pass to simulator listed after `args` and
+ `plusargs`
+
+ cwd: str, optional
+ working directory where simulation binary is saved
+
+ trace: bool, optional
+ If true, a waveform dump file will be produced
+
+ period: float, optional
+ If provided, the period of the clock generated in the testbench,
+ in seconds.
+ """
+
+ # set up interfaces if needed
+
+ if max_rate is None:
+ max_rate = self.max_rate
+
+ if intf_objs:
+ self.intfs = create_intf_objs(self.intf_defs, max_rate=max_rate)
+
+ # set defaults
+
+ if plusargs is None:
+ plusargs = []
+ else:
+ plusargs = deepcopy(plusargs)
+
+ if args is None:
+ args = []
+
+ if extra_args is None:
+ extra_args = []
+
+ if trace is None:
+ trace = self.trace
+
+ if (period is None) and (frequency is not None):
+ period = 1 / frequency
+
+ if period is None:
+ period = self.period
+
+ if start_delay is None:
+ start_delay = self.start_delay
+
+ # build the simulation if necessary
+
+ sim = self.build(cwd=cwd, fast=True)
+
+ # enable tracing if desired. it's convenient to define +trace
+ # when running Icarus Verilog, even though it is not necessary,
+ # since logic in the testbench can use that flag to enable/disable
+ # waveform dumping in a simulator-agnostic manner.
+
+ if trace:
+ carefully_add_plusarg(key='trace', args=args, plusargs=plusargs)
+
+ if period is not None:
+ carefully_add_plusarg(key='period', value=period, args=args, plusargs=plusargs)
+
+ if max_rate is not None:
+ carefully_add_plusarg(key='max-rate', value=max_rate, args=args, plusargs=plusargs)
+
+ if start_delay is not None:
+ carefully_add_plusarg(
+ key='start-delay', value=start_delay, args=args, plusargs=plusargs)
+
+ # add plusargs that define queue connections
+
+ for name, value in self.intf_defs.items():
+ wire = value.get('wire', None)
+ uri = value.get('uri', None)
+
+ if (wire is not None) and (uri is not None):
+ plusargs += [(wire, uri)]
+
+ # run-specific configurations (if running the same simulator build multiple times
+ # in parallel)
+
+ if run is not None:
+ dumpfile = f'{run}.{self.trace_type}'
+ plusargs.append(('dumpfile', dumpfile))
+
+ # run the simulation
+
+ p = None
+
+ if self.tool == 'icarus':
+ names = ['switchboard']
+ modules = []
+
+ if self.xyce:
+ names.append('xyce')
+
+ for name in names:
+ vpi = icarus_find_vpi(cwd=cwd, name=name)
+ assert vpi is not None, f'Could not find VPI binary "{name}"'
+ modules.append(vpi)
+
+ # set the trace format
+ if self.trace_type == 'fst' and ('-fst' not in extra_args):
+ extra_args.append('-fst')
+
+ p = icarus_run(
+ sim,
+ plusargs=plusargs,
+ modules=modules,
+ extra_args=args + extra_args
+ )
+ else:
+ # make sure that the simulator was built with tracing enabled
+ if trace and not self.trace:
+ raise ValueError('Simulator was built without tracing enabled.'
+ ' Please set trace=True in the SbDut and try again.')
+
+ if self.tool == 'verilator':
+ p = verilator_run(
+ sim,
+ plusargs=plusargs,
+ args=args
+ )
+ else:
+ p = binary_run(
+ sim,
+ args=plusargs_to_args(plusargs) + args
+ )
+
+ # Add newly created Popen object to subprocess list
+ self.process_collection.add(p)
+
+ # return a Popen object that one can wait() on
+
+ return p
+
+
+
+[docs]
+ def terminate(
+ self,
+ stop_timeout=10,
+ use_sigint=False
+ ):
+ self.process_collection.terminate(stop_timeout=stop_timeout, use_sigint=use_sigint)
+
+
+
+[docs]
+ def input_analog(
+ self,
+ filename: str,
+ pins: List[Dict[str, Any]] = None,
+ name: str = None,
+ check_name: bool = True,
+ dir: str = None
+ ):
+ """
+ Specifies a SPICE subcircuit to be used in a mixed-signal simulation. This involves
+ providing the path to the SPICE file containing the subcircuit definition and describing
+ how real-valued outputs in the SPICE subcircuit should be converted to binary values in
+ the Verilog simulation (and vice versa for subcircuit inputs).
+
+ Each of these conversions is specified as an entry in the "pins" argument, which is a
+ list of dictionaries, each representing a single pin of the SPICE subcircuit. Each
+ dictionary may have the following keys:
+ * "name": name of the pin. Bus notation may be used, e.g. "myBus[7:0]". In that case,
+ it is expected that the SPICE subcircuit has pins corresponding to each bit in the bus,
+ e.g. "myBus[0]", "myBus[1]", etc.
+ * "type": direction of the pin. May be "input", "output", or "constant". If "constant",
+ then this pin will not show up the Verilog module definition to be instantiated in user
+ code. Instead, the SPICE subcircuit pin with that name will be tied off to a fixed
+ voltage specified in the "value" field (below).
+ * "vil": low voltage threshold, below which a real-number voltage from the SPICE
+ simulation is considered to be a logical "0".
+ * "vih": high voltage threshold, above which a real-number voltage from the SPICE
+ simulation is considered to be a logical "1".
+ * "vol": real-number voltage to pass to a SPICE subcircuit input when the digital value
+ driven is "0".
+ * "voh": real-number voltage to pass to a SPICE subcircuit input when the digital value
+ driven is "1".
+ * "tr": time taken in the SPICE simulation to transition from a logic "0" value to a
+ logic "1" value.
+ * "tf": time taken in the SPICE simulation to transition from a logic "1" value to a
+ logic "0" value.
+ * "initial": initial value of a SPICE subcircuit pin. Currently only implemented for
+ subcircuit outputs. This is sometimes helpful, because there is a slight delay between
+ t=0 and the time when the SPICE simulation reports values for its outputs. Specifying
+ "initial" for subcircuit outputs prevents the corresponding digital signals from being
+ driven to "X" at t=0.
+
+ Parameters
+ ----------
+ filename: str
+ The path of the SPICE file containing the subcircuit definition.
+ pins: List[Dict[str, Any]]
+ List of dictionaries, each describing a pin of the subcircuit.
+ name: str
+ Name of the SPICE subcircuit that will be instantiated in the mixed-signal simulation.
+ If not provided, Switchboard guesses that the name is filename stem. For example,
+ if filename="myCircuit.cir", then Switchboard will guess that the subcircuit name
+ is "myCircuit"
+ check_name: bool
+ If True (default), Switchboard parses the provided file to make sure that there
+ is a subcircuit definition matching the given name.
+ dir: str
+ Running a mixed-signal simulation involves creating SPICE and Verilog wrappers. This
+ argument specifies the directory where those wrappers should be written. If not
+ provided, defaults to the directory where filename is located.
+ """
+
+ # automatically configures for Xyce co-simulation if not already configured
+
+ self._configure_xyce()
+
+ # set defaults
+
+ if pins is None:
+ pins = []
+
+ if name is None:
+ # guess the name of the subcircuit from the filename
+ guessed = True
+ name = Path(filename).stem
+ else:
+ guessed = False
+
+ if check_name:
+ # make sure that a subcircuit matching the provided or guessed
+ # name exists in the file provided. this is not foolproof, since
+ # the SPICE parser is minimal and won't consider things like
+ # .INCLUDE. hence, this feature can be disabled by setting
+ # check_name=False
+
+ subckts = parse_spice_subckts(filename)
+
+ for subckt in subckts:
+ if name.lower() == name.lower():
+ break
+ else:
+ if guessed:
+ raise Exception(f'Inferred subckt named "{name}" from the filename,'
+ ' however a corresponding subckt definition was not found. Please'
+ ' specify a subckt name via the "name" argument.')
+ else:
+ raise Exception(f'Could not find a subckt definition for "{name}".')
+
+ if dir is None:
+ dir = Path(filename).resolve().parent
+
+ spice_wrapper = make_ams_spice_wrapper(
+ name=name,
+ filename=filename,
+ pins=pins,
+ dir=dir
+ )
+
+ verilog_wrapper = make_ams_verilog_wrapper(
+ name=name,
+ filename=spice_wrapper,
+ pins=pins,
+ dir=dir
+ )
+
+ self.input(verilog_wrapper)
+
+
+
+[docs]
+ def package(self, suffix=None, fast=None):
+ # set defaults
+
+ if suffix is None:
+ suffix = self.suffix
+
+ if fast is None:
+ fast = self.fast
+
+ # see if we can exit early
+
+ if fast:
+ package = self.find_package(suffix=suffix)
+
+ if package is not None:
+ return package
+
+ # if not, parse with surelog and postprocess with morty
+
+ if suffix:
+ self.set('tool', 'morty', 'task', 'uniquify', 'var', 'suffix', suffix)
+
+ self.set('tool', 'sed', 'task', 'remove', 'var', 'to_remove', '`resetall')
+
+ self.run()
+
+ # return the path to the output
+ return self.find_package(suffix=suffix)
+
+
+
+[docs]
+ def find_package(self, suffix=None):
+ if suffix is None:
+ return self.find_result('v', step='parse')
+ else:
+ return self.find_result('v', step='uniquify')
+
+
+
+
+
+[docs]
+def metadata_str(design: str, tool: str = None, trace: bool = False,
+ trace_type: str = None, threads: int = None, parameters: dict = None) -> Path:
+
+ opts = []
+
+ opts += [design]
+
+ if parameters is not None:
+ for k, v in parameters.items():
+ opts += [k, v]
+
+ if tool is not None:
+ opts += [tool]
+
+ if trace:
+ assert trace_type is not None
+ opts += [trace_type]
+
+ if threads is not None:
+ opts += ['threads', threads]
+
+ return '-'.join(str(opt) for opt in opts)
+
+
+
+
+[docs]
+def carefully_add_plusarg(key, args, plusargs, value=None):
+ for plusarg in plusargs:
+ if isinstance(plusarg, (list, tuple)):
+ if (len(plusarg) >= 1) and (key == plusarg[0]):
+ return
+ elif key == plusarg:
+ return
+
+ if f'+{key}' in args:
+ return
+
+ if any(elem.startswith(f'+{key}+') for elem in args):
+ return
+
+ if any(elem.startswith(f'+{key}=') for elem in args):
+ return
+
+ if value is None:
+ plusargs.append(key)
+ else:
+ plusargs.append((key, value))
+
+
+#!/usr/bin/env python3
+
+# Command-line tool that bridges Switchboard packets over TCP.
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+# reference for setting up Python TCP connections:
+# https://realpython.com/python-sockets/#echo-client-and-server
+
+# reference for non-blocking socket programming:
+# https://stackoverflow.com/a/16745561
+
+import time
+import socket
+import argparse
+import numpy as np
+
+from switchboard import PySbRx, PySbTx, PySbPacket
+
+SB_PACKET_SIZE_BYTES = 60
+
+
+
+[docs]
+def tcp2sb(outputs, conn):
+ while True:
+ # receive data from TCP
+ data_rx_from_tcp = bytes([])
+
+ while len(data_rx_from_tcp) < SB_PACKET_SIZE_BYTES:
+ b = conn.recv(SB_PACKET_SIZE_BYTES - len(data_rx_from_tcp))
+
+ if len(b) == 0:
+ # connection is not alive anymore
+ return
+
+ data_rx_from_tcp += b
+
+ # convert to a switchboard packet
+ p = bytes2sb(data_rx_from_tcp)
+
+ # figure out which queue this packet is going to
+ for rule, output in outputs:
+ if rule_matches(rule, p.destination):
+ output.send(p)
+ break
+ else:
+ raise Exception(f"No rule for destination {p.destination}")
+
+
+
+
+[docs]
+def sb2tcp(inputs, conn):
+ tcp_data_to_send = bytes([])
+
+ while True:
+ # get a switchboard packet
+ while True:
+ # select input and queue its next run as last
+ destination, sbrx = inputs.pop(0)
+ inputs.append((destination, sbrx))
+
+ # try to receive a packet from this input
+ p = sbrx.recv(blocking=False)
+
+ if p is not None:
+ if destination is not None:
+ p.destination = destination
+ break
+
+ # convert the switchboard packet to bytes
+ tcp_data_to_send = sb2bytes(p)
+
+ # send the packet out over TCP
+ while len(tcp_data_to_send) > 0:
+ n = conn.send(tcp_data_to_send)
+
+ if n == 0:
+ # connection is not alive anymore
+ return
+
+ tcp_data_to_send = tcp_data_to_send[n:]
+
+
+
+
+[docs]
+def run_client(host, port, quiet=False, max_rate=None, inputs=None, outputs=None, run_once=False):
+ """
+ Connect to a server, retrying until a connection is made.
+ """
+
+ # initialize PySbRx/PySbTx objects if needed
+
+ inputs, outputs = normalize_inputs_and_outputs(
+ inputs=inputs, outputs=outputs, max_rate=max_rate)
+
+ # connect to the server in a loop
+ while True:
+ if not quiet:
+ print(f'Waiting for server (host={host}, port={port})')
+ while True:
+ try:
+ conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ conn.connect((host, port))
+ break
+ except ConnectionRefusedError:
+ time.sleep(1)
+ if not quiet:
+ print(f'Connected to server (host={host}, port={port})')
+
+ # communicate with the server
+ if outputs is not None:
+ tcp2sb(outputs=outputs, conn=conn)
+ elif inputs is not None:
+ sb2tcp(inputs=inputs, conn=conn)
+
+ if run_once:
+ break
+
+
+
+
+[docs]
+def run_server(host, port=0, quiet=False, max_rate=None, run_once=False, outputs=None, inputs=None):
+ """
+ Accepts client connections in a loop until Ctrl-C is pressed.
+ """
+
+ # initialize PySbRx/PySbTx objects if needed
+
+ inputs, outputs = normalize_inputs_and_outputs(
+ inputs=inputs, outputs=outputs, max_rate=max_rate)
+
+ # create the server socket
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1) # allow port to be reused immediately
+ server_socket.bind((host, port))
+ server_socket.listen()
+
+ # accept client connections in a loop
+ while True:
+ # accept a client
+ if not quiet:
+ print(f'Waiting for client (host={host}, port={port})')
+ conn, _ = server_socket.accept()
+ if not quiet:
+ print(f'Connected to client (host={host}, port={port})')
+
+ # communicate with the client
+ if outputs is not None:
+ tcp2sb(outputs=outputs, conn=conn)
+ elif inputs is not None:
+ sb2tcp(inputs=inputs, conn=conn)
+
+ if run_once:
+ break
+
+
+
+
+[docs]
+def normalize_inputs_and_outputs(inputs, outputs, max_rate):
+ if outputs is not None:
+ assert inputs is None, 'Cannot specify both inputs and outputs'
+ outputs = normalize_outputs(outputs, max_rate)
+ else:
+ assert inputs is not None, 'Must specify either inputs or outputs'
+ inputs = normalize_inputs(inputs, max_rate)
+
+ return inputs, outputs
+
+
+
+
+[docs]
+def normalize_outputs(outputs, max_rate):
+ retval = []
+
+ for rule, output in outputs:
+ output = convert_to_queue(q=output, cls=PySbTx, max_rate=max_rate)
+ retval.append((rule, output))
+
+ return retval
+
+
+
+
+[docs]
+def normalize_inputs(inputs, max_rate):
+ retval = []
+
+ for input in inputs:
+ if not isinstance(input, (list, tuple)):
+ destination, input = None, input
+ else:
+ destination, input = input
+
+ input = convert_to_queue(q=input, cls=PySbRx, max_rate=max_rate)
+
+ retval.append((destination, input))
+
+ return retval
+
+
+
+
+[docs]
+def sb2bytes(p):
+ # construct a bytes object from a Switchboard packet
+ arr = np.concatenate((
+ np.array([p.destination, p.flags], dtype=np.uint32),
+ p.data.view(np.uint32)
+ ))
+ return arr.tobytes()
+
+
+
+
+[docs]
+def bytes2sb(b):
+ # construct a Switchboard packet from a bytes object
+ arr = np.frombuffer(b, dtype=np.uint32)
+ return PySbPacket(arr[0], arr[1], arr[2:].view(np.uint8))
+
+
+
+
+[docs]
+def convert_to_queue(q, cls, max_rate=None):
+ if isinstance(q, cls):
+ # note that None is passed through
+ return q
+ elif isinstance(q, str):
+ kwargs = {}
+
+ if max_rate is not None:
+ kwargs['max_rate'] = max_rate
+
+ return cls(q, **kwargs)
+ else:
+ raise TypeError(f'{q} must be a string or {cls.__name__}; got {type(q)}')
+
+
+
+
+[docs]
+def rule_matches(rule, addr):
+ if rule == '*':
+ return True
+ elif isinstance(rule, int):
+ return addr == rule
+ elif isinstance(rule, range):
+ return rule.start <= addr < rule.stop
+ elif isinstance(rule, (list, tuple)):
+ # return True if any subrules match
+ for subrule in rule:
+ if rule_matches(subrule, addr):
+ return True
+
+ # otherwise return False
+ return False
+ else:
+ raise Exception(f'Unsupported rule type: {type(rule)}')
+
+
+
+
+[docs]
+def parse_rule(rule):
+ subrules = rule.split(',')
+
+ retval = []
+
+ for subrule in subrules:
+ if subrule == '*':
+ retval.append('*')
+ elif '-' in subrule:
+ start, stop = subrule.split('-')
+ start = int(start)
+ stop = int(stop)
+ retval.append(range(start, stop + 1))
+ else:
+ retval.append(int(subrule))
+
+ return retval
+
+
+
+
+[docs]
+def start_tcp_bridge(inputs=None, outputs=None, host='localhost', port=5555,
+ quiet=True, max_rate=None, mode='auto', run_once=False):
+
+ kwargs = dict(
+ host=host,
+ port=port,
+ quiet=quiet,
+ max_rate=max_rate,
+ run_once=run_once
+ )
+
+ target = None
+
+ if mode == 'client':
+ target = run_client
+ elif mode == 'server':
+ target = run_server
+
+ if outputs is not None:
+ kwargs['outputs'] = outputs
+ if mode == 'auto':
+ target = run_server
+ elif inputs is not None:
+ kwargs['inputs'] = inputs
+ if mode == 'auto':
+ target = run_client
+ else:
+ raise Exception('Must specify "outputs" or "inputs" argument.')
+
+ assert target is not None, 'Could not determine whether to run the bridge as a client or server'
+
+ import multiprocessing
+
+ p = multiprocessing.Process(target=target, kwargs=kwargs, daemon=True)
+ p.start()
+
+ return p
+
+
+
+
+[docs]
+def get_parser():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('--outputs', type=str, default=None, nargs='+', help="Space-separated"
+ " dictionary of queues to write to. For example, 0:a.q 1-2:b.q 3,5-7:c.q *:d.q means"
+ " that packets sent to destination 0 are routed to a.q, packets sent to destinations 1"
+ " or 2 are routed to b.q, packets sent to destinations 3, 5, 6, or 7 are routed to c.q,"
+ " and all other packets are routed to d.q")
+ parser.add_argument('--inputs', type=str, default=None, nargs='+', help="Space-separated"
+ " list of queues to read from, for example a.q b.q c.q")
+ parser.add_argument('--port', type=int, default=5555, help="TCP port used for"
+ " sending and receiving packets.")
+ parser.add_argument('--host', type=str, default="localhost", help="IP address or hostname"
+ " used sending/receiving packets.")
+ parser.add_argument('-q', action='store_true', help="Quiet mode: doesn't print anything.")
+ parser.add_argument('--max-rate', type=float, default=None, help='Maximum rate at which'
+ ' queues are read or written.')
+ parser.add_argument('--run-once', action='store_true', help="Process only one connection"
+ " in server mode, then exit.")
+
+ return parser
+
+
+
+
+[docs]
+def main():
+ # parse command-line arguments
+
+ parser = get_parser()
+ args = parser.parse_args()
+
+ # main logic
+
+ if args.outputs is not None:
+ # parse the output mapping
+ outputs = []
+ for output in args.outputs:
+ rule, output = output.split(':')
+ outputs.append((parse_rule(rule), output))
+
+ run_server(outputs=outputs, host=args.host, port=args.port,
+ quiet=args.q, max_rate=args.max_rate, run_once=args.run_once)
+ elif args.inputs is not None:
+ run_client(inputs=args.inputs, host=args.host, port=args.port,
+ quiet=args.q, max_rate=args.max_rate)
+ else:
+ raise ValueError("Must specify either --inputs or --outputs")
+
+
+
+if __name__ == "__main__":
+ main()
+
+#!/usr/bin/env python3
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+"""
+UMI/UART Transactor.
+"""
+
+import numpy as np
+
+
+
+[docs]
+class uart_xactor:
+ REG_TX = 0
+ REG_RX = 4
+ REG_SR = 8
+
+ REGF_SR_RXEMPTY_MASK = 1 << 0
+ REGF_SR_RXFULL_MASK = 1 << 1
+ REGF_SR_TXEMPTY_MASK = 1 << 8
+ REGF_SR_TXFULL_MASK = 1 << 9
+
+ def __init__(self, umi, encoding='ascii'):
+ self.encoding = encoding
+ self.umi = umi
+
+
+[docs]
+ def read_byte(self):
+ c8 = None
+ while True:
+ sr = self.umi.read(self.REG_SR, np.uint32)
+ if (sr & self.REGF_SR_RXEMPTY_MASK) == 0:
+ rx = self.umi.read(self.REG_RX, np.uint32)
+ c8 = rx & 0xff
+ break
+ return bytes([c8])
+
+
+
+[docs]
+ def readline(self, size=-1, end='\n'):
+ line = ""
+
+ while size < 0 or len(line) < size:
+ c8 = self.read_byte()
+ if c8 == end.encode(self.encoding):
+ break
+ line += c8.decode(self.encoding)
+ return line
+
+
+
+[docs]
+ def write_byte(self, b):
+ while True:
+ sr = self.umi.read(self.REG_SR, np.uint32)
+ if (sr & self.REGF_SR_TXFULL_MASK) == 0:
+ self.umi.write(self.REG_TX, np.uint32(b))
+ break
+
+
+ # File-like ops
+
+
+
+ # On streaming UART, reading until EOF doesn't make sense
+ # So we default the size arg to 1
+
+[docs]
+ def read(self, size=1):
+ data = bytes(0)
+ while size < 0 or len(data) < size:
+ b = self.read_byte()
+ data += b
+ return data
+
+
+
+# Python interface for UMI reads, writes, and atomic operations
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+import random
+import numpy as np
+
+from numbers import Integral
+from typing import Iterable, Union, Dict
+
+from _switchboard import (PyUmi, PyUmiPacket, umi_pack, UmiCmd, UmiAtomic)
+from .gpio import UmiGpio
+
+# note: it was convenient to implement some of this in Python, rather
+# than have everything in C++, because it was easier to provide
+# flexibility with numpy types
+
+
+
+[docs]
+class UmiTxRx:
+ def __init__(self, tx_uri: str = None, rx_uri: str = None,
+ srcaddr: Union[int, Dict[str, int]] = 0, posted: bool = False,
+ max_bytes: int = None, fresh: bool = False, error: bool = True,
+ max_rate: float = -1):
+ """
+ Parameters
+ ----------
+ tx_uri: str, optional
+ Name of the switchboard queue that
+ write() and send() will send UMI packets to. Defaults to
+ None, meaning "unused".
+ rx_uri: str, optional
+ Name of the switchboard queue that
+ read() and recv() will receive UMI packets from. Defaults
+ to None, meaning "unused".
+ srcaddr: int, optional
+ Default srcaddr to use for reads,
+ ack'd writes, and atomics. Defaults to 0. Can also be
+ provided as a dictionary with separate defaults for each
+ type of transaction: srcaddr={'read': 0x1234, 'write':
+ 0x2345, 'atomic': 0x3456}. When the defaults are provided
+ with a dictionary, all keys are optional. Transactions
+ that are not specified in the dictionary will default
+ to a srcaddr of 0.
+ posted: bool, optional
+ If True, default to using posted
+ (i.e., non-ack'd) writes. This can be overridden on a
+ transaction-by-transaction basis. Defaults to False.
+ max_bytes: int, optional
+ Default maximum number of bytes
+ to use in each UMI transaction. Can be overridden on a
+ transaction-by-transaction basis. Defaults to 32 bytes.
+ fresh: bool, optional
+ If True, the queue specified by the uri parameter will get
+ cleared before executing the simulation.
+ error: bool, optional
+ If True, error out upon receiving an unexpected UMI response.
+ """
+
+ if tx_uri is None:
+ tx_uri = ""
+
+ if rx_uri is None:
+ rx_uri = ""
+
+ self.umi = PyUmi(tx_uri, rx_uri, fresh, max_rate=max_rate)
+
+ if srcaddr is not None:
+ # convert srcaddr default to a dictionary if necessary
+ if isinstance(srcaddr, int):
+ srcaddr = {
+ 'read': srcaddr,
+ 'write': srcaddr,
+ 'atomic': srcaddr
+ }
+
+ if isinstance(srcaddr, dict):
+ self.def_read_srcaddr = int(srcaddr.get('read', 0))
+ self.def_write_srcaddr = int(srcaddr.get('write', 0))
+ self.def_atomic_srcaddr = int(srcaddr.get('atomic', 0))
+ else:
+ raise ValueError(f'Unsupported default srcaddr specification: {srcaddr}')
+ else:
+ raise ValueError('Default value of "srcaddr" cannot be None.')
+
+ if posted is not None:
+ self.default_posted = bool(posted)
+ else:
+ raise ValueError('Default value of "posted" cannot be None.')
+
+ if max_bytes is None:
+ max_bytes = 32
+
+ self.default_max_bytes = max_bytes
+ self.default_error = error
+
+
+[docs]
+ def gpio(
+ self,
+ iwidth: int = 32,
+ owidth: int = 32,
+ init: int = 0,
+ dstaddr: int = 0,
+ srcaddr: int = 0,
+ posted: bool = False,
+ max_bytes: int = 32
+ ) -> UmiGpio:
+ """
+ Returns an object for communicating with umi_gpio modules.
+
+ Parameters
+ ----------
+ iwidth: int
+ Width of GPIO input (bits). Defaults to 32.
+ owidth: int
+ Width of GPIO output (bits). Defaults to 32.
+ init: int
+ Default value of GPIO output. Defaults to 0.
+ dstaddr: int
+ Base address of the GPIO device. Defaults to 0.
+ srcaddr: int
+ Source address to which responses should be routed. Defaults to 0.
+ posted: bool
+ Whether writes should be sent as posted. Defaults to False.
+ max_bytes: int
+ Maximum number of bytes in a single transaction to umi_gpio.
+
+ Returns
+ -------
+ UmiGpio
+ UmiGpio object with .i (input) and .o (output) attributes
+ """
+
+ return UmiGpio(
+ iwidth=iwidth,
+ owidth=owidth,
+ init=init,
+ dstaddr=dstaddr,
+ srcaddr=srcaddr,
+ posted=posted,
+ max_bytes=max_bytes,
+ umi=self
+ )
+
+
+
+[docs]
+ def init_queues(self, tx_uri: str = None, rx_uri: str = None, fresh: bool = False):
+ """
+ Parameters
+ ----------
+ tx_uri: str, optional
+ Name of the switchboard queue that
+ write() and send() will send UMI packets to. Defaults to
+ None, meaning "unused".
+ rx_uri: str, optional
+ Name of the switchboard queue that
+ read() and recv() will receive UMI packets from. Defaults
+ to None, meaning "unused".
+ fresh: bool, optional
+ If True, the queue specified by the uri parameter will get
+ cleared before executing the simulation.
+ """
+
+ if tx_uri is None:
+ tx_uri = ""
+
+ if rx_uri is None:
+ rx_uri = ""
+
+ self.umi.init(tx_uri, rx_uri, fresh)
+
+
+
+[docs]
+ def send(self, p, blocking=True) -> bool:
+ """
+ Sends (or tries to send if burst=False) a UMI transaction (PyUmiPacket)
+ Returns True if the packet was sent successfully, else False.
+
+ Parameters
+ ----------
+ p: PyUmiPacket
+ The UMI packet that will be sent
+ blocking: bool, optional
+ If True, the program will pause execution until a response to the write request
+ is received.
+
+ Returns
+ -------
+ bool
+ Returns true if the `p` was sent successfully
+ """
+
+ return self.umi.send(p, blocking)
+
+
+
+[docs]
+ def recv(self, blocking=True) -> PyUmiPacket:
+ """
+ Wait for and return a UMI packet if blocking=True, otherwise return a
+ UMI packet if one can be read immediately, and None otherwise.
+
+ Parameters
+ ----------
+ blocking: bool, optional
+ If True, the function will wait until a UMI packet can be read.
+ If False, a None type will be returned if no UMI packet can be read
+ immediately.
+
+ Returns
+ -------
+ PyUmiPacket
+ If `blocking` is True, a PyUmiPacket is always returned. If `blocking` is
+ False, a PyUmiPacket object will be returned if one can be read immediately.
+ Otherwise, a None type will be returned.
+ """
+
+ return self.umi.recv(blocking)
+
+
+
+[docs]
+ def write(self, addr, data, srcaddr=None, max_bytes=None,
+ posted=None, qos=0, prot=0, progressbar=False, check_alignment=True,
+ error=None):
+ """
+ Writes the provided data to the given 64-bit address.
+
+ Parameters
+ ----------
+ addr: int
+ 64-bit address that will be written to
+
+ data: np.uint8, np.uint16, np.uint32, np.uint64, or np.array
+ Can be either a numpy integer type (e.g., np.uint32) or an numpy
+ array of integer types (np.uint8, np.uin16, np.uint32, np.uint64, etc.).
+ The `data` input may contain more than "max_bytes", in which case
+ the write will automatically be split into multiple transactions.
+
+ srcaddr: int, optional
+ UMI source address used for the write transaction. This is sometimes needed to make
+ the write response gets routed to the right place.
+
+ max_bytes: int, optional
+ Indicates the maximum number of bytes that can be used for any individual UMI
+ transaction. If not specified, this defaults to the value of `max_bytes`
+ provided in the UmiTxRx constructor, which in turn defaults to 32.
+
+ posted: bool, optional
+ If True, a write response will be received.
+
+ qos: int, optional
+ 4-bit Quality of Service field in UMI Command
+
+ prot: int, optional
+ 2-bit protection mode field in UMI command
+
+ progressbar: bool, optional
+ If True, the number of packets written will be displayed via a progressbar
+ in the terminal.
+
+ check_alignment: bool, optional
+ If true, an exception will be raised if the `addr` parameter cannot be aligned based
+ on the size of the `data` parameter
+
+ error: bool, optional
+ If True, error out upon receiving an unexpected UMI response.
+ """
+
+ # set defaults
+
+ if max_bytes is None:
+ max_bytes = self.default_max_bytes
+
+ max_bytes = int(max_bytes)
+
+ if srcaddr is None:
+ srcaddr = self.def_write_srcaddr
+
+ srcaddr = int(srcaddr)
+
+ if posted is None:
+ posted = self.default_posted
+
+ posted = bool(posted)
+
+ if error is None:
+ error = self.default_error
+
+ error = bool(error)
+
+ # format the data to be written
+
+ if isinstance(data, np.ndarray):
+ if data.ndim == 0:
+ write_data = np.atleast_1d(data)
+ elif data.ndim == 1:
+ write_data = data
+ else:
+ raise ValueError(f'Can only write 1D arrays (got ndim={data.ndim})')
+
+ if not np.issubdtype(write_data.dtype, np.integer):
+ raise ValueError('Can only write integer dtypes such as uint8, uint16, etc.'
+ f' (got dtype "{data.dtype}")')
+ elif isinstance(data, np.integer):
+ write_data = np.array(data, ndmin=1)
+ else:
+ raise TypeError(f"Unknown data type: {type(data)}")
+
+ if check_alignment:
+ size = dtype2size(write_data.dtype)
+ if not addr_aligned(addr=addr, align=size):
+ raise ValueError(f'addr=0x{addr:x} misaligned for size={size}')
+
+ # perform write
+ self.umi.write(addr, write_data, srcaddr, max_bytes,
+ posted, qos, prot, progressbar, error)
+
+
+
+[docs]
+ def write_readback(self, addr, value, mask=None, srcaddr=None, dtype=None,
+ posted=True, write_srcaddr=None, check_alignment=True, error=None):
+ """
+ Writes the provided value to the given 64-bit address, and blocks
+ until that value is read back from the provided address.
+
+ Parameters
+ ----------
+ addr: int
+ The destination address to write to and read from
+
+ value: int, np.uint8, np.uint16, np.uint32, or np.uint64
+ The data written to `addr`
+
+ mask: int, optional
+ argument (optional) allows the user to mask off some bits
+ in the comparison of the data written vs. data read back. For example,
+ if a user only cares that bit "5" is written to "1", and does not care
+ about the value of other bits read back, they could use mask=1<<5.
+
+ srcaddr: int, optional
+ The UMI source address used for the read transaction. This is
+ sometimes needed to make sure that reads get routed to the right place.
+
+ dtype: np.uint8, np.uint16, np.uint32, or np.uint64, optional
+ If `value` is specified as plain integer, then dtype must be specified,
+ indicating a particular numpy integer type. This is so that the size of
+ the UMI transaction can be set appropriately.
+
+ posted: bool, optional
+ By default, the write is performed as a posted write, however it is
+ is possible to use an ack'd write by setting posted=False.
+
+ write_srcaddr: int, optional
+ If `posted`=True, write_srcaddr specifies the srcaddr used for that
+ transaction. If write_srcaddr is None, the default srcaddr for writes
+ will be used.
+
+ check_alignment: bool, optional
+ If true, an exception will be raised if the `addr` parameter cannot be aligned based
+ on the size of the `data` parameter
+
+ error: bool, optional
+ If True, error out upon receiving an unexpected UMI response.
+
+ Raises
+ ------
+ TypeError
+ If `value` is not an integer type, if `mask` is not an integer type
+ """
+
+ # set defaults
+
+ if srcaddr is None:
+ srcaddr = self.def_read_srcaddr
+
+ srcaddr = int(srcaddr)
+
+ if write_srcaddr is None:
+ write_srcaddr = self.def_write_srcaddr
+
+ write_srcaddr = int(write_srcaddr)
+
+ # convert value to a numpy datatype if it is not already
+ if not isinstance(value, np.integer):
+ if dtype is not None:
+ value = dtype(value)
+ else:
+ raise TypeError("Must provide value as a numpy integer type, or specify dtype.")
+
+ # set the mask to all ones if it is None
+ if mask is None:
+ nbits = (np.dtype(value.dtype).itemsize * 8)
+ mask = (1 << nbits) - 1
+
+ # convert mask to a numpy datatype if it is not already
+ if not isinstance(mask, np.integer):
+ if dtype is not None:
+ mask = dtype(mask)
+ else:
+ raise TypeError("Must provide mask as a numpy integer type, or specify dtype.")
+
+ # write, then read repeatedly until the value written is observed
+ self.write(addr, value, srcaddr=write_srcaddr, posted=posted,
+ check_alignment=check_alignment, error=error)
+ rdval = self.read(addr, value.dtype, srcaddr=srcaddr,
+ check_alignment=check_alignment, error=error)
+ while ((rdval & mask) != (value & mask)):
+ rdval = self.read(addr, value.dtype, srcaddr=srcaddr,
+ check_alignment=check_alignment, error=error)
+
+
+
+[docs]
+ def read(self, addr, num_or_dtype, dtype=np.uint8, srcaddr=None,
+ max_bytes=None, qos=0, prot=0, check_alignment=True, error=None):
+ """
+ Parameters
+ ----------
+ addr: int
+ The 64-bit address read from
+
+ num_or_dtype: int or numpy integer datatype
+ If a plain int, `num_or_datatype` specifies the number of bytes to be read.
+ If a numpy integer datatype (np.uint8, np.uint16, etc.), num_or_datatype
+ specifies the data type to be returned.
+
+ dtype: numpy integer datatype, optional
+ If num_or_dtype is a plain integer, the value returned by this function
+ will be a numpy array of type "dtype". On the other hand, if num_or_dtype
+ is a numpy datatype, the value returned will be a scalar of that datatype.
+
+ srcaddr: int, optional
+ The UMI source address used for the read transaction. This
+ is sometimes needed to make sure that reads get routed to the right place.
+
+ max_bytes: int, optional
+ Indicates the maximum number of bytes that can be used for any individual UMI
+ transaction. If not specified, this defaults to the value of `max_bytes`
+ provided in the UmiTxRx constructor, which in turn defaults to 32.
+
+ qos: int, optional
+ 4-bit Quality of Service field used in the UMI command
+
+ prot: int, optional
+ 2-bit Protection mode field used in the UMI command
+
+ error: bool, optional
+ If True, error out upon receiving an unexpected UMI response.
+
+ Returns
+ -------
+ numpy integer array
+ An array of `num_or_dtype` bytes read from `addr`. The array will have the type
+ specified by `dtype` or `num_or_dtype`
+ """
+
+ # set defaults
+
+ if max_bytes is None:
+ max_bytes = self.default_max_bytes
+
+ max_bytes = int(max_bytes)
+
+ if srcaddr is None:
+ srcaddr = self.def_read_srcaddr
+
+ srcaddr = int(srcaddr)
+
+ if error is None:
+ error = self.default_error
+
+ error = bool(error)
+
+ if isinstance(num_or_dtype, (type, np.dtype)):
+ num = 1
+ bytes_per_elem = np.dtype(num_or_dtype).itemsize
+ else:
+ num = num_or_dtype
+ bytes_per_elem = np.dtype(dtype).itemsize
+
+ if check_alignment:
+ size = nbytes2size(bytes_per_elem)
+ if not addr_aligned(addr=addr, align=size):
+ raise ValueError(f'addr=0x{addr:x} misaligned for size={size}')
+
+ result = self.umi.read(addr, num, bytes_per_elem, srcaddr, max_bytes,
+ qos, prot, error)
+
+ if isinstance(num_or_dtype, (type, np.dtype)):
+ return result.view(num_or_dtype)[0]
+ else:
+ return result
+
+
+
+[docs]
+ def atomic(self, addr, data, opcode, srcaddr=None, qos=0, prot=0, error=None):
+ """
+ Parameters
+ ----------
+ addr: int
+ 64-bit address atomic operation will be applied to.
+
+ data: np.uint8, np.uint16, np.uint32, np.uint64
+ must so that the size of the atomic operation can be determined.
+
+ opcode: str or switchboard.UmiAtomic value
+ Supported string values are 'add', 'and', 'or', 'xor', 'max', 'min',
+ 'minu', 'maxu', and 'swap' (case-insensitive).
+
+ srcaddr: int, optional
+ The UMI source address used for the atomic transaction. This
+ is sometimes needed to make sure the response get routed to the right place.
+
+ qos: int, optional
+ 4-bit Quality of Service field used in the UMI command
+
+ prot: int, optional
+ 2-bit Protection mode field used in the UMI command
+
+ error: bool, optional
+ If True, error out upon receiving an unexpected UMI response.
+
+ Raises
+ ------
+ TypeError
+ If `value` is not a numpy integer datatype
+
+ Returns
+ -------
+ np.uint8, np.uint16, np.uint32, np.uint64
+ The value returned by this function is the original value at addr,
+ immediately before the atomic operation is applied. The numpy dtype of the
+ returned value will be the same as for "data".
+ """
+
+ # set defaults
+
+ if srcaddr is None:
+ srcaddr = self.def_atomic_srcaddr
+
+ srcaddr = int(srcaddr)
+
+ if error is None:
+ error = self.default_error
+
+ error = bool(error)
+
+ # resolve the opcode to an enum if needed
+ if isinstance(opcode, str):
+ opcode = getattr(UmiAtomic, f'UMI_REQ_ATOMIC{opcode.upper()}')
+
+ # format the data for sending
+ if isinstance(data, np.integer):
+ atomic_data = np.array(data, ndmin=1).view(np.uint8)
+ result = self.umi.atomic(addr, atomic_data, opcode, srcaddr, qos, prot, error)
+ return result.view(data.dtype)[0]
+ else:
+ raise TypeError("The data provided to atomic should be of a numpy integer type"
+ " so that the transaction size can be determined")
+
+
+
+
+
+[docs]
+def size2dtype(size: int, signed: bool = False, float: bool = False):
+ if float:
+ dtypes = [None, np.float16, np.float32, np.float64, np.float128]
+ elif signed:
+ dtypes = [np.int8, np.int16, np.int32, np.int64]
+ else:
+ dtypes = [np.uint8, np.uint16, np.uint32, np.uint64]
+
+ dtype = None
+
+ if size < len(dtypes):
+ dtype = dtypes[size]
+
+ if dtype is None:
+ raise ValueError(f'Size {size} unsupported with signed={signed} and float={float}')
+
+ return dtype
+
+
+
+
+[docs]
+def nbytes2size(nbytes: Integral):
+ if not isinstance(nbytes, Integral):
+ raise ValueError(f'Number of bytes must be an integer (got {nbytes})')
+ elif nbytes <= 0:
+ raise ValueError(f'Number of bytes must be positive (got {nbytes})')
+
+ nbytes = int(nbytes)
+
+ if bin(nbytes).count('1') != 1:
+ raise ValueError(f'Number of bytes must be a power of two (got {nbytes})')
+
+ size = nbytes.bit_length() - 1
+
+ if size < 0:
+ raise ValueError(f'size cannot be negative (got {size})')
+
+ return size
+
+
+
+
+[docs]
+def dtype2size(dtype: np.dtype):
+ if isinstance(dtype, np.dtype):
+ return nbytes2size(dtype.itemsize)
+ else:
+ raise ValueError(f'dtype must be of type np.dtype (got {type(dtype)})')
+
+
+
+
+[docs]
+def addr_aligned(addr: Integral, align: Integral) -> bool:
+ return ((addr >> align) << align) == addr
+
+
+
+
+[docs]
+def random_int_value(name, value, min, max, align=None):
+ # determine the length of the transaction
+
+ if isinstance(value, range) or (value is None):
+ if isinstance(value, range):
+ a = value.start
+ b = value.stop - 1
+ else:
+ a = min
+ b = max
+
+ value = random.randint(a, b)
+
+ if align is not None:
+ value >>= align
+ value <<= align
+ elif isinstance(value, (list, tuple, np.ndarray)):
+ value = random.choice(value)
+
+ if isinstance(value, (range, list, tuple)):
+ # if we happen to pick a range object from the list/tuple, then run this
+ # function on the range object. this allows users to specify a collection
+ # of values and ranges to efficiently represent a discontinuous space
+ # of options. it is also possible to have lists of lists of ranges, to
+ # adjust the probabilities of drawing from each range
+ return random_int_value(name=name, value=value, min=min, max=max, align=align)
+
+ # validate result
+
+ check_int_in_range(name, value, min=min, max=max)
+
+ value = int(value)
+
+ if align is not None:
+ if not addr_aligned(addr=value, align=align):
+ raise ValueError(f'misaligned {name}: 0x{value:x}')
+
+ # return result
+
+ return value
+
+
+
+
+[docs]
+def check_int_in_range(name, value, min=None, max=None):
+ if not np.issubdtype(type(value), np.integer):
+ raise ValueError(f'{name} is not an integer')
+
+ if (min is not None) and (value < min):
+ raise ValueError(f'{name} is less than {min}')
+
+ if (max is not None) and (value > max):
+ raise ValueError(f'{name} is greater than {max}')
+
+
+
+
+[docs]
+def random_umi_packet(
+ opcode=None,
+ len=None,
+ size=None,
+ dstaddr=None,
+ srcaddr=None,
+ data=None,
+ qos=0,
+ prot=0,
+ ex=0,
+ atype=0,
+ eom=1,
+ eof=1,
+ max_bytes=32
+):
+ """
+ Generates a Random UMI packet. All parameters are optional. Parameters that
+ are not explicitly specified will be assigned randomly.
+
+ For more information on the meanings of each parameter, reference
+ `the UMI specification <https://github.com/zeroasiccorp/umi/blob/main/README.md>`_
+
+ Parameters
+ ----------
+ opcode: int, optional
+ Command opcode
+
+ len: int, optional
+ Word transfers per message. (`len`-1 words will be transferred
+ per message)
+
+ size: int, optional
+ Word size ((2^size)-1 bits per word)
+
+ dstaddr: int, optional
+ 64-bit destination address used in the UMI packet
+
+ srcaddr: int, optional
+ 64-bit source address used in the UMI packet
+
+ data: numpy integer array, optional
+ Values used in the Data field for the UMI packet
+
+ qos: int, optional
+ 4-bit Quality of Service field used in the UMI command
+
+ prot: int, optional
+ 2-bit Protection mode field used in the UMI command
+
+ ex: int, optional
+ 1-bit Exclusive access indicator in the UMI command
+
+ atype: int, optional
+ 8-bit field specifying the type of atomic transaction used
+ in the UMI command for an atomic operation
+
+ eom: int, optional
+ 1-bit End of Message indicator in UMI command, used to track
+ the transfer of the last word in a message
+
+ eof: int, optional
+ 1-bit End of Frame bit in UMI command, used to indicate the
+ last message in a sequence of related UMI transactions
+
+ max_bytes: int, optional
+ The maximum number of bytes included in each UMI packet
+
+ Returns
+ -------
+ """
+
+ # input validation
+
+ check_int_in_range("max_bytes", max_bytes, min=0, max=32)
+
+ # TODO: make these parameters flexible, or more centrally-defined
+
+ MAX_SUMI_SIZE = 3
+ AW = 64
+
+ # determine the opcode
+
+ if opcode is None:
+ opcode = random.choice([
+ UmiCmd.UMI_REQ_WRITE,
+ UmiCmd.UMI_REQ_POSTED,
+ UmiCmd.UMI_REQ_READ,
+ UmiCmd.UMI_RESP_WRITE,
+ UmiCmd.UMI_RESP_READ,
+ UmiCmd.UMI_REQ_ATOMIC
+ ])
+ elif isinstance(opcode, Iterable):
+ opcode = random.choice(opcode)
+
+ # determine the size of the transaction
+
+ if (size is None) and (data is not None):
+ size = dtype2size(data.dtype)
+
+ size = random_int_value('size', size, 0, MAX_SUMI_SIZE)
+
+ # determine the length of the transaction
+
+ if (len is None) and (data is not None):
+ len = data.size - 1
+
+ len = random_int_value('len', len, 0, (max_bytes >> size) - 1)
+
+ # generate other fields
+
+ atype = random_int_value('atype', atype, 0x00, 0x08)
+ qos = random_int_value('qos', qos, 0b0000, 0b1111)
+ prot = random_int_value('prot', prot, 0b00, 0b11)
+ eom = random_int_value('eom', eom, 0b0, 0b1)
+ eof = random_int_value('eof', eof, 0b0, 0b1)
+ ex = random_int_value('ex', ex, 0b0, 0b1)
+
+ # construct the command field
+
+ cmd = umi_pack(opcode, atype, size, len, eom, eof, qos, prot, ex)
+
+ # generate destination address
+
+ dstaddr = random_int_value('dstaddr', dstaddr, 0, (1 << AW) - 1, align=size)
+ srcaddr = random_int_value('srcaddr', srcaddr, 0, (1 << AW) - 1, align=size)
+
+ # generate data if needed
+
+ if opcode in [
+ UmiCmd.UMI_REQ_WRITE,
+ UmiCmd.UMI_REQ_POSTED,
+ UmiCmd.UMI_RESP_READ,
+ UmiCmd.UMI_REQ_ATOMIC
+ ]:
+ if data is None:
+ dtype = size2dtype(size)
+ iinfo = np.iinfo(dtype)
+ if opcode != UmiCmd.UMI_REQ_ATOMIC:
+ nelem = len + 1
+ else:
+ nelem = 1
+ data = np.random.randint(iinfo.min, iinfo.max - 1,
+ size=nelem, dtype=dtype)
+
+ # return the packet
+
+ return PyUmiPacket(cmd=cmd, dstaddr=dstaddr, srcaddr=srcaddr, data=data)
+
+
+# General utilities for working with switchboard
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+import atexit
+import signal
+import subprocess
+import shlex
+
+
+
+[docs]
+def plusargs_to_args(plusargs):
+ args = []
+ if plusargs is not None:
+ if not isinstance(plusargs, list):
+ raise TypeError('plusargs must be a list')
+ for plusarg in plusargs:
+ if isinstance(plusarg, (list, tuple)):
+ if len(plusarg) != 2:
+ raise ValueError('Only lists/tuples of length 2 allowed')
+ args += [f'+{plusarg[0]}={plusarg[1]}']
+ else:
+ args += [f'+{plusarg}']
+ return args
+
+
+
+
+[docs]
+def binary_run(bin, args=None, stop_timeout=10, use_sigint=False,
+ quiet=False, print_command=False, cwd=None, env=None):
+
+ cmd = []
+
+ cmd += [bin]
+
+ if args is not None:
+ if not isinstance(args, list):
+ raise TypeError('args must be a list')
+ cmd += args
+
+ cmd = [str(elem) for elem in cmd]
+ if print_command:
+ print(' '.join([shlex.quote(elem) for elem in cmd]))
+
+ kwargs = {}
+ if quiet:
+ kwargs['stdout'] = subprocess.DEVNULL
+ kwargs['stderr'] = subprocess.DEVNULL
+
+ p = subprocess.Popen(cmd, cwd=cwd, env=env, **kwargs)
+
+ def stop_bin(p=p, stop_timeout=stop_timeout, use_sigint=use_sigint):
+ poll = p.poll()
+ if poll is not None:
+ # process has stopped
+ return
+
+ if use_sigint:
+ try:
+ p.send_signal(signal.SIGINT)
+ p.wait(stop_timeout)
+ return
+ except: # noqa: E722
+ # if there is an exception for any reason, including
+ # Ctrl-C during the wait() call, want to make sure
+ # that the process is actually terminated
+ pass
+
+ # if we get to this point, the process is still running
+ # and sigint didn't work (or we didn't try it)
+ p.terminate()
+
+ # note: tried combining all process terminations into a single
+ # function registered with atexit, but it didn't appear to make
+ # a difference when running with a few hundred verilator sim
+ # processes - still took a few dozen milliseconds to stop each
+ # simulator.
+
+ atexit.register(stop_bin)
+
+ return p
+
+
+
+
+[docs]
+class ProcessCollection:
+ def __init__(self):
+ self.processes = []
+
+
+
+
+
+[docs]
+ def wait(self):
+ import subprocess
+ import multiprocessing
+
+ for process in self.processes:
+ if isinstance(process, subprocess.Popen):
+ process.wait()
+ elif isinstance(process, multiprocessing.Process):
+ process.join()
+ else:
+ raise Exception(f'Unknown process type: {type(process)}')
+
+
+
+[docs]
+ def terminate(
+ self,
+ stop_timeout=10,
+ use_sigint=False
+ ):
+ if not self.processes:
+ return
+
+ for p in self.processes:
+ if isinstance(p, ProcessCollection):
+ p.terminate(stop_timeout=stop_timeout, use_sigint=use_sigint)
+ else:
+ poll = p.poll()
+ if poll is not None:
+ # process has stopped
+ return
+
+ if use_sigint:
+ try:
+ p.send_signal(signal.SIGINT)
+ p.wait(stop_timeout)
+ return
+ except: # noqa: E722
+ # if there is an exception for any reason, including
+ # Ctrl-C during the wait() call, want to make sure
+ # that the process is actually terminated
+ pass
+
+ # if we get to this point, the process is still running
+ # and sigint didn't work (or we didn't try it)
+ p.terminate()
+
+
+
+# Utilities for working with Verilator
+
+# Copyright (c) 2024 Zero ASIC Corporation
+# This code is licensed under Apache License 2.0 (see LICENSE for details)
+
+# TODO: replace with SiliconCompiler functionality
+
+from .util import plusargs_to_args, binary_run
+
+
+
+[docs]
+def verilator_run(bin, plusargs=None, args=None, use_sigint=True, **kwargs):
+ if args is None:
+ extra_args = []
+ elif isinstance(args, (list, tuple)):
+ # even if args is already a list, make a copy to
+ # ensure that changes don't propagate back to the
+ # value that the user provided for args
+ extra_args = list(args)
+ else:
+ # if the user provided a single value for args,
+ # wrap it in a list rather than erroring out
+ extra_args = [args]
+
+ # build up the argument list
+ args = []
+ args += plusargs_to_args(plusargs)
+
+ # append any extra arguments user provided by the user
+ args += extra_args
+
+ return binary_run(bin=bin, args=args, use_sigint=use_sigint, **kwargs)
+
+
' + + '' + + _("Hide Search Matches") + + "
" + ) + ); + }, + + /** + * helper function to hide the search marks again + */ + hideSearchWords: () => { + document + .querySelectorAll("#searchbox .highlight-link") + .forEach((el) => el.remove()); + document + .querySelectorAll("span.highlighted") + .forEach((el) => el.classList.remove("highlighted")); + localStorage.removeItem("sphinx_highlight_terms") + }, + + initEscapeListener: () => { + // only install a listener if it is really needed + if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; + + document.addEventListener("keydown", (event) => { + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; + if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { + SphinxHighlight.hideSearchWords(); + event.preventDefault(); + } + }); + }, +}; + +_ready(() => { + /* Do not call highlightSearchWords() when we are on the search page. + * It will highlight words from the *previous* search query. + */ + if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); + SphinxHighlight.initEscapeListener(); +}); diff --git a/genindex.html b/genindex.html new file mode 100644 index 00000000..fe9ddb18 --- /dev/null +++ b/genindex.html @@ -0,0 +1,655 @@ + + + + + +
|
+
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
|
+
+ | + |
+ | + |
|
+ + |
+ | + |
+ | + |
+ | + |
This page is meant to serve as a reference manual for the various python classes and functions contained in the switchboard package. To get started, reference some of the examples included in the repository.
+PySbPacket
+PySbRx
+PySbRxPcie
+PySbTx
+PySbTxPcie
+PyUmi
+PyUmiPacket
+UmiAtomic
+UmiCmd
UmiCmd.UMI_INVALID
UmiCmd.UMI_REQ_ATOMIC
UmiCmd.UMI_REQ_ERROR
UmiCmd.UMI_REQ_FUTURE0
UmiCmd.UMI_REQ_LINK
UmiCmd.UMI_REQ_POSTED
UmiCmd.UMI_REQ_RDMA
UmiCmd.UMI_REQ_READ
UmiCmd.UMI_REQ_USER0
UmiCmd.UMI_REQ_WRITE
UmiCmd.UMI_RESP_FUTURE0
UmiCmd.UMI_RESP_FUTURE1
UmiCmd.UMI_RESP_LINK
UmiCmd.UMI_RESP_READ
UmiCmd.UMI_RESP_USER0
UmiCmd.UMI_RESP_USER1
UmiCmd.UMI_RESP_WRITE
UmiCmd.name
UmiCmd.value
delete_queue()
delete_queues()
umi_atype()
umi_eof()
umi_eom()
umi_ex()
umi_len()
umi_opcode()
umi_opcode_to_str()
umi_pack()
umi_prot()
umi_qos()
umi_size()
+ _ | ||
+ |
+ _switchboard | + |
+ s | ||
+ |
+ switchboard | + |
+ |
+ switchboard.bitvector | + |
+ |
+ switchboard.icarus | + |
+ |
+ switchboard.loopback | + |
+ |
+ switchboard.sbdut | + |
+ |
+ switchboard.sbtcp | + |
+ |
+ switchboard.uart_xactor | + |
+ |
+ switchboard.umi | + |
+ |
+ switchboard.util | + |
+ |
+ switchboard.verilator | + |
Classes:
+
|
++ |
Functions:
+
|
++ |
switchboard pybind11 plugin
+Classes:
+
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
+Members: |
+
|
+Members: |
+
Functions:
+
|
+Deletes an old queue. |
+
|
+Deletes a old queues specified in a list. |
+
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
+Returns a string representation of a UMI opcode |
+
|
+Returns a UMI command with the given parameters. |
+
|
++ |
|
++ |
|
++ |
Bases: pybind11_object
Bases: pybind11_object
uri (str Name of the queue for the Rx object)
fresh (bool, optional) – If True, the queue specified by the uri parameter will get cleared before executing the simulation.
blocking (bool, optional) – If true, the function will pause execution until a packetcan be read. If false, the function will return None if a packetcannot be read immediately
+Returns a UMI packet. If blocking is false, None will be returned If a packet cannot be read immediately.
+Bases: pybind11_object
Bases: pybind11_object
uri (str) – Name of the queue for the Tx object
fresh (bool, optional) – If True, the queue specified by the uri parameter will get cleared before executing the simulation.
py_packet (PySbPacket) – UMI packet to send
blocking (bool, optional) – If true, the function will pause execution until the packet has been successfully sent.
Bases: pybind11_object
Bases: pybind11_object
addr (int) – 64-bit address atomic operation will be applied to.
data (np.uint8, np.uint16, np.uint32, np.uint64) – must so that the size of the atomic operation can be determined.
opcode (str or switchboard.UmiAtomic value) – Supported string values are ‘add’, ‘and’, ‘or’, ‘xor’, ‘max’, ‘min’, ‘minu’, ‘maxu’, and ‘swap’ (case-insensitive).
srcaddr (int, optional) – The UMI source address used for the atomic transaction. This is sometimes needed to make sure the response get routed to the right place.
qos (int, optional) – 4-bit Quality of Service field in UMI Command
prot (int, optional) – 2-bit protection mode field in UMI command
error (bool, optional) – If true, error out upon receiving an unexpected UMI response.
The value returned by this function is the original value at addr, immediately before the atomic operation is applied. The numpy dtype of the returned value will be the same as for data.
+np.uint8, np.uint16, np.uint32, np.uint64
+tx_uri (str, optional) – Name of the switchboard queue that write() and send() will send UMI packets to. Defaults to None, meaning “unused”.
rx_uri (str, optional) – Name of the switchboard queue that read() and recv() will receive UMI packets from. Defaults to None, meaning “unused”.
fresh (bool, optional) – If true, the tx_uri and rx_uri will be cleared prior to running
addr (int) – The 64-bit address read from
num_or_dtype (int or numpy integer datatype) – If a plain int, num_or_datatype specifies the number of bytes to be read.If a numpy integer datatype (np.uint8, np.uint16, etc.), num_or_datatypespecifies the data type to be returned.
dtype (numpy integer datatype, optional) – If num_or_dtype is a plain integer, the value returned by this functionwill be a numpy array of type dtype. On the other hand, if num_or_dtypeis a numpy datatype, the value returned will be a scalar of that datatype.
srcaddr (int, optional) – The UMI source address used for the read transaction. Thisis sometimes needed to make sure that reads get routed to the right place.
max_bytes (int, optional) – Indicates the maximum number of bytes that can be used for any individualUMI transaction. num_or_dtype can be larger than max_bytes, in whichcase the read will automatically be split into multiple transactions. Currently,the data payload size used by switchboard is 32 bytes, which is reflected in thedefault value of max_bytes.
qos (int, optional) – 4-bit Quality of Service field in UMI Command
prot (int, optional) – 2-bit protection mode field in UMI command
error (bool, optional) – If true, error out upon receiving an unexpected UMI response.
blocking (bool, optional) – If true, the function will pause execution until a packetcan be read. If false, the function will return None if a packetcannot be read immediately
+Returns a UMI packet. If blocking is false, None will be returned If a packet cannot be read immediately.
+py_packet (PySbPacket) – UMI packet to send
blocking (bool, optional) – If true, the function will pause execution until the packet has been successfully sent.
addr (int) – 64-bit address that will be written to
data (np.uint8, np.uint16, np.uint32, np.uint64, or np.array) – Can be either a numpy integer type (e.g., np.uint32) or an numpy array of integer types (np.uint8, np.uin16, np.uint32, np.uint64, etc.). The data input may contain more than max_bytes, in which case the write will automatically be split into multiple transactions.
srcaddr (int, optional) – UMI source address used for the write transaction. This is sometimes needed to make the write response gets routed to the right place.
max_bytes (int, optional) – Indicates the maximum number of bytesthat can be used for any individual UMI transaction in bytes. Currently, the data payload size used by switchboard is 32 bytes, which is reflected in the default value of max_bytes.
posted (bool, optional) – If True, a write response will be received.
qos (int, optional) – 4-bit Quality of Service field in UMI Command
prot (int, optional) – 2-bit protection mode field in UMI command
progressbar (bool, optional) – If True, the number of packets written will be displayed via a progressbarin the terminal.
error (bool, optional) – If true, error out upon receiving an unexpected UMI response.
Bases: pybind11_object
Bases: pybind11_object
Members:
+UMI_REQ_ATOMICADD
+UMI_REQ_ATOMICAND
+UMI_REQ_ATOMICOR
+UMI_REQ_ATOMICXOR
+UMI_REQ_ATOMICMAX
+UMI_REQ_ATOMICMIN
+UMI_REQ_ATOMICMAXU
+UMI_REQ_ATOMICMINU
+UMI_REQ_ATOMICSWAP
+Bases: pybind11_object
Members:
+UMI_INVALID
+UMI_REQ_READ
+UMI_REQ_WRITE
+UMI_REQ_POSTED
+UMI_REQ_RDMA
+UMI_REQ_ATOMIC
+UMI_REQ_USER0
+UMI_REQ_FUTURE0
+UMI_REQ_ERROR
+UMI_REQ_LINK
+UMI_RESP_READ
+UMI_RESP_WRITE
+UMI_RESP_USER0
+UMI_RESP_USER1
+UMI_RESP_FUTURE0
+UMI_RESP_FUTURE1
+UMI_RESP_LINK
+Deletes an old queue.
+Deletes a old queues specified in a list.
+Returns a string representation of a UMI opcode
+Returns a UMI command with the given parameters.
+uart_xactor
uart_xactor.REGF_SR_RXEMPTY_MASK
uart_xactor.REGF_SR_RXFULL_MASK
uart_xactor.REGF_SR_TXEMPTY_MASK
uart_xactor.REGF_SR_TXFULL_MASK
uart_xactor.REG_RX
uart_xactor.REG_SR
uart_xactor.REG_TX
uart_xactor.read()
uart_xactor.read_byte()
uart_xactor.readline()
uart_xactor.write()
uart_xactor.write_byte()
Functions:
+
|
++ |
|
++ |
|
++ |
|
++ |
cwd (str)
name (str)
cincludes (List[str])
ldflags (List[str])
str
+cwd (str | Path)
name (str)
Path
+Functions:
+
|
+Performs a loopback test by sending packets into a block and checking that the packets received back are equivalent under the UMI split/merge rules. |
+
Performs a loopback test by sending packets into a block and checking that +the packets received back are equivalent under the UMI split/merge rules.
+umi (UmiTxRx)
packets (Integral | Iterable | Iterator) –
Can be a number, a list of packets, or a generator.
+If this is a number, it represents the number of packets to send, +which are generated with random_umi_packet. Any remaining arguments +are passed directly to random_umi_packet.
If this is an iterable (list, tuple, etc.), then it represents a list +of packets to use for the test. This is helpful if you want to use a very +specific sequence of transactions.
This can also be an iterator, which might be convenient if you want to +send a very large number of packets without having to store them +all in memory at once, e.g. (random_umi_packet() for _ in range(1000000))
ValueError – If the number of packets is not positive or if the packets argument is empty
Exception – If a received packet does not match the corresponding transmitted packet
Note that the sbdut module inherits functions from siliconcompiler.Chips, such as input() used to add rtl files to the simulation and add() used to add include directories to the simulation. These functions are important for specifying the RTL files used in a simulation, but they are not covered in the documentation here. For a complete list of the the inherited functions, checkout the siliconcompiler documentation.
+Class inheriting from the SiliconCompiler Chip class that can be used for building a +Switchboard-based testbench.
+This class is meant to be interacted with like a regular Chip object, but it has some parameters +automatically configured to abstract away setup of files that are required by all Switchboard +testbenches.
+Classes:
+
|
++ |
Functions:
+
|
++ |
|
++ |
Bases: Chip
design (string) – Name of the top level chip design module.
tool (string, optional) – Which tool to use to compile simulator. Options are “verilator” or +“icarus”.
default_main (bool, optional) – If True, the default testbench.cc will be used and does not need to +be provided via the add() function
trace (bool, optional) – If true, a waveform dump file will be produced using the file type +specified by trace_type.
trace_type (str, optional) – File type for the waveform dump file. Defaults to vcd.
module (str, optional) – module containing the siliconcompiler driver for this object
fpga (bool, optional) – If True, compile using switchboard’s library of modules for FPGA emulation, +rather than the modules for RTL simulation.
xyce (bool, optional) – If True, compile for xyce co-simulation.
frequency (float, optional) – If provided, the default frequency of the clock generated in the testbench, +in seconds.
period (float, optional) – If provided, the default period of the clock generated in the testbench, +in seconds.
max_rate (float, optional) – If provided, the maximum real-world rate that the simulation is allowed to run +at, in Hz. Can be useful to encourage time-sharing between many processes and +for performance modeling when latencies are large and/or variable.
start_delay (float, optional) – If provided, the real-world time to delay before the first clock tick in the +simulation. Can be useful to make sure that programs start at approximately +the same time and to prevent simulations from stepping on each other’s toes +when starting up.
warnings (List[str], optional) – If provided, a list of tool-specific warnings to enable. If not provided, a default +set of warnings will be included. Warnings can be disabled by setting this argument +to an empty list.
cmdline (bool, optional) – If True, accept configuration settings from the command line, such as “–trace”, +“–tool TOOL”, and “–fast”.
fast (bool, optional) – If True, the simulation binary will not be rebuilt if an existing one is found. +The setting here can be overridden when build() is called by setting its argument +with the same name.
extra_args (dict, optional) – If provided and cmdline=True, a dictionary of additional command line arguments +to be made available. The keys of the dictionary are the arguments (“-n”, “–test”, +etc.) and the values are themselves dictionaries that contain keyword arguments +accepted by argparse (“action”: “store_true”, “default”: 42, etc.)
timeunit (str)
timeprecision (str)
autowrap (bool)
cwd (str, optional) – Working directory for the simulation build
fast (bool, optional) – If True, the simulation binary will not be rebuilt if an existing one +is found. Defaults to the value provided to the SbDut constructor, +which in turn defaults to False.
Specifies a SPICE subcircuit to be used in a mixed-signal simulation. This involves +providing the path to the SPICE file containing the subcircuit definition and describing +how real-valued outputs in the SPICE subcircuit should be converted to binary values in +the Verilog simulation (and vice versa for subcircuit inputs).
+Each of these conversions is specified as an entry in the “pins” argument, which is a +list of dictionaries, each representing a single pin of the SPICE subcircuit. Each +dictionary may have the following keys: +* “name”: name of the pin. Bus notation may be used, e.g. “myBus[7:0]”. In that case, +it is expected that the SPICE subcircuit has pins corresponding to each bit in the bus, +e.g. “myBus[0]”, “myBus[1]”, etc. +* “type”: direction of the pin. May be “input”, “output”, or “constant”. If “constant”, +then this pin will not show up the Verilog module definition to be instantiated in user +code. Instead, the SPICE subcircuit pin with that name will be tied off to a fixed +voltage specified in the “value” field (below). +* “vil”: low voltage threshold, below which a real-number voltage from the SPICE +simulation is considered to be a logical “0”. +* “vih”: high voltage threshold, above which a real-number voltage from the SPICE +simulation is considered to be a logical “1”. +* “vol”: real-number voltage to pass to a SPICE subcircuit input when the digital value +driven is “0”. +* “voh”: real-number voltage to pass to a SPICE subcircuit input when the digital value +driven is “1”. +* “tr”: time taken in the SPICE simulation to transition from a logic “0” value to a +logic “1” value. +* “tf”: time taken in the SPICE simulation to transition from a logic “1” value to a +logic “0” value. +* “initial”: initial value of a SPICE subcircuit pin. Currently only implemented for +subcircuit outputs. This is sometimes helpful, because there is a slight delay between +t=0 and the time when the SPICE simulation reports values for its outputs. Specifying +“initial” for subcircuit outputs prevents the corresponding digital signals from being +driven to “X” at t=0.
+filename (str) – The path of the SPICE file containing the subcircuit definition.
pins (List[Dict[str, Any]]) – List of dictionaries, each describing a pin of the subcircuit.
name (str) – Name of the SPICE subcircuit that will be instantiated in the mixed-signal simulation. +If not provided, Switchboard guesses that the name is filename stem. For example, +if filename=”myCircuit.cir”, then Switchboard will guess that the subcircuit name +is “myCircuit”
check_name (bool) – If True (default), Switchboard parses the provided file to make sure that there +is a subcircuit definition matching the given name.
dir (str) – Running a mixed-signal simulation involves creating SPICE and Verilog wrappers. This +argument specifies the directory where those wrappers should be written. If not +provided, defaults to the directory where filename is located.
plusargs (str or list or tuple, optional) – additional arguments to pass to simulator that must be preceded +with a +. These are listed after args.
args (str or list or tuple, optional) – additional arguments to pass to simulator listed before plusargs and +extra_args
extra_args (str or list or tuple, optional) – additional arguments to pass to simulator listed after args and +plusargs
cwd (str, optional) – working directory where simulation binary is saved
trace (bool, optional) – If true, a waveform dump file will be produced
period (float, optional) – If provided, the period of the clock generated in the testbench, +in seconds.
frequency (float)
max_rate (float)
start_delay (float)
run (str)
intf_objs (bool)
Popen
+Functions:
+
|
++ |
|
++ |
+ | + |
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
+Connect to a server, retrying until a connection is made. |
+
|
+Accepts client connections in a loop until Ctrl-C is pressed. |
+
|
++ |
|
++ |
|
++ |
|
++ |
Connect to a server, retrying until a connection is made.
+Accepts client connections in a loop until Ctrl-C is pressed.
+UMI/UART Transactor.
+Classes:
+
|
++ |
Bases: object
Classes:
+
|
++ |
Functions:
+
|
++ |
|
++ |
|
++ |
|
++ |
|
++ |
|
+Generates a Random UMI packet. |
+
|
++ |
Bases: object
tx_uri (str, optional) – Name of the switchboard queue that +write() and send() will send UMI packets to. Defaults to +None, meaning “unused”.
rx_uri (str, optional) – Name of the switchboard queue that +read() and recv() will receive UMI packets from. Defaults +to None, meaning “unused”.
srcaddr (int, optional) – Default srcaddr to use for reads, +ack’d writes, and atomics. Defaults to 0. Can also be +provided as a dictionary with separate defaults for each +type of transaction: srcaddr={‘read’: 0x1234, ‘write’: +0x2345, ‘atomic’: 0x3456}. When the defaults are provided +with a dictionary, all keys are optional. Transactions +that are not specified in the dictionary will default +to a srcaddr of 0.
posted (bool, optional) – If True, default to using posted +(i.e., non-ack’d) writes. This can be overridden on a +transaction-by-transaction basis. Defaults to False.
max_bytes (int, optional) – Default maximum number of bytes +to use in each UMI transaction. Can be overridden on a +transaction-by-transaction basis. Defaults to 32 bytes.
fresh (bool, optional) – If True, the queue specified by the uri parameter will get +cleared before executing the simulation.
error (bool, optional) – If True, error out upon receiving an unexpected UMI response.
max_rate (float)
addr (int) – 64-bit address atomic operation will be applied to.
data (np.uint8, np.uint16, np.uint32, np.uint64) – must so that the size of the atomic operation can be determined.
opcode (str or switchboard.UmiAtomic value) – Supported string values are ‘add’, ‘and’, ‘or’, ‘xor’, ‘max’, ‘min’, +‘minu’, ‘maxu’, and ‘swap’ (case-insensitive).
srcaddr (int, optional) – The UMI source address used for the atomic transaction. This +is sometimes needed to make sure the response get routed to the right place.
qos (int, optional) – 4-bit Quality of Service field used in the UMI command
prot (int, optional) – 2-bit Protection mode field used in the UMI command
error (bool, optional) – If True, error out upon receiving an unexpected UMI response.
TypeError – If value is not a numpy integer datatype
+The value returned by this function is the original value at addr, +immediately before the atomic operation is applied. The numpy dtype of the +returned value will be the same as for “data”.
+np.uint8, np.uint16, np.uint32, np.uint64
+Returns an object for communicating with umi_gpio modules.
+iwidth (int) – Width of GPIO input (bits). Defaults to 32.
owidth (int) – Width of GPIO output (bits). Defaults to 32.
init (int) – Default value of GPIO output. Defaults to 0.
dstaddr (int) – Base address of the GPIO device. Defaults to 0.
srcaddr (int) – Source address to which responses should be routed. Defaults to 0.
posted (bool) – Whether writes should be sent as posted. Defaults to False.
max_bytes (int) – Maximum number of bytes in a single transaction to umi_gpio.
UmiGpio object with .i (input) and .o (output) attributes
+UmiGpio
+tx_uri (str, optional) – Name of the switchboard queue that +write() and send() will send UMI packets to. Defaults to +None, meaning “unused”.
rx_uri (str, optional) – Name of the switchboard queue that +read() and recv() will receive UMI packets from. Defaults +to None, meaning “unused”.
fresh (bool, optional) – If True, the queue specified by the uri parameter will get +cleared before executing the simulation.
addr (int) – The 64-bit address read from
num_or_dtype (int or numpy integer datatype) – If a plain int, num_or_datatype specifies the number of bytes to be read. +If a numpy integer datatype (np.uint8, np.uint16, etc.), num_or_datatype +specifies the data type to be returned.
dtype (numpy integer datatype, optional) – If num_or_dtype is a plain integer, the value returned by this function +will be a numpy array of type “dtype”. On the other hand, if num_or_dtype +is a numpy datatype, the value returned will be a scalar of that datatype.
srcaddr (int, optional) – The UMI source address used for the read transaction. This +is sometimes needed to make sure that reads get routed to the right place.
max_bytes (int, optional) – Indicates the maximum number of bytes that can be used for any individual UMI +transaction. If not specified, this defaults to the value of max_bytes +provided in the UmiTxRx constructor, which in turn defaults to 32.
qos (int, optional) – 4-bit Quality of Service field used in the UMI command
prot (int, optional) – 2-bit Protection mode field used in the UMI command
error (bool, optional) – If True, error out upon receiving an unexpected UMI response.
An array of num_or_dtype bytes read from addr. The array will have the type +specified by dtype or num_or_dtype
+numpy integer array
+Wait for and return a UMI packet if blocking=True, otherwise return a +UMI packet if one can be read immediately, and None otherwise.
+blocking (bool, optional) – If True, the function will wait until a UMI packet can be read. +If False, a None type will be returned if no UMI packet can be read +immediately.
+If blocking is True, a PyUmiPacket is always returned. If blocking is +False, a PyUmiPacket object will be returned if one can be read immediately. +Otherwise, a None type will be returned.
+Sends (or tries to send if burst=False) a UMI transaction (PyUmiPacket) +Returns True if the packet was sent successfully, else False.
+p (PyUmiPacket) – The UMI packet that will be sent
blocking (bool, optional) – If True, the program will pause execution until a response to the write request +is received.
Returns true if the p was sent successfully
+bool
+Writes the provided data to the given 64-bit address.
+addr (int) – 64-bit address that will be written to
data (np.uint8, np.uint16, np.uint32, np.uint64, or np.array) – Can be either a numpy integer type (e.g., np.uint32) or an numpy +array of integer types (np.uint8, np.uin16, np.uint32, np.uint64, etc.). +The data input may contain more than “max_bytes”, in which case +the write will automatically be split into multiple transactions.
srcaddr (int, optional) – UMI source address used for the write transaction. This is sometimes needed to make +the write response gets routed to the right place.
max_bytes (int, optional) – Indicates the maximum number of bytes that can be used for any individual UMI +transaction. If not specified, this defaults to the value of max_bytes +provided in the UmiTxRx constructor, which in turn defaults to 32.
posted (bool, optional) – If True, a write response will be received.
qos (int, optional) – 4-bit Quality of Service field in UMI Command
prot (int, optional) – 2-bit protection mode field in UMI command
progressbar (bool, optional) – If True, the number of packets written will be displayed via a progressbar +in the terminal.
check_alignment (bool, optional) – If true, an exception will be raised if the addr parameter cannot be aligned based +on the size of the data parameter
error (bool, optional) – If True, error out upon receiving an unexpected UMI response.
Writes the provided value to the given 64-bit address, and blocks +until that value is read back from the provided address.
+addr (int) – The destination address to write to and read from
value (int, np.uint8, np.uint16, np.uint32, or np.uint64) – The data written to addr
mask (int, optional) – argument (optional) allows the user to mask off some bits +in the comparison of the data written vs. data read back. For example, +if a user only cares that bit “5” is written to “1”, and does not care +about the value of other bits read back, they could use mask=1<<5.
srcaddr (int, optional) – The UMI source address used for the read transaction. This is +sometimes needed to make sure that reads get routed to the right place.
dtype (np.uint8, np.uint16, np.uint32, or np.uint64, optional) – If value is specified as plain integer, then dtype must be specified, +indicating a particular numpy integer type. This is so that the size of +the UMI transaction can be set appropriately.
posted (bool, optional) – By default, the write is performed as a posted write, however it is +is possible to use an ack’d write by setting posted=False.
write_srcaddr (int, optional) – If `posted`=True, write_srcaddr specifies the srcaddr used for that +transaction. If write_srcaddr is None, the default srcaddr for writes +will be used.
check_alignment (bool, optional) – If true, an exception will be raised if the addr parameter cannot be aligned based +on the size of the data parameter
error (bool, optional) – If True, error out upon receiving an unexpected UMI response.
TypeError – If value is not an integer type, if mask is not an integer type
+addr (Integral)
align (Integral)
bool
+Generates a Random UMI packet. All parameters are optional. Parameters that +are not explicitly specified will be assigned randomly.
+For more information on the meanings of each parameter, reference +the UMI specification
+opcode (int, optional) – Command opcode
len (int, optional) – Word transfers per message. (len-1 words will be transferred +per message)
size (int, optional) – Word size ((2^size)-1 bits per word)
dstaddr (int, optional) – 64-bit destination address used in the UMI packet
srcaddr (int, optional) – 64-bit source address used in the UMI packet
data (numpy integer array, optional) – Values used in the Data field for the UMI packet
qos (int, optional) – 4-bit Quality of Service field used in the UMI command
prot (int, optional) – 2-bit Protection mode field used in the UMI command
ex (int, optional) – 1-bit Exclusive access indicator in the UMI command
atype (int, optional) – 8-bit field specifying the type of atomic transaction used +in the UMI command for an atomic operation
eom (int, optional) – 1-bit End of Message indicator in UMI command, used to track +the transfer of the last word in a message
eof (int, optional) – 1-bit End of Frame bit in UMI command, used to indicate the +last message in a sequence of related UMI transactions
max_bytes (int, optional) – The maximum number of bytes included in each UMI packet
Classes:
++ | + |
Functions:
+
|
++ |
|
++ |
Functions:
+
|
++ |