Skip to content

Commit

Permalink
Option for running each benchmark in its own server instance (Bears-R…
Browse files Browse the repository at this point in the history
…-Us#3730)

* set up option for running each benchmark in its own server instance

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix command formatting

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* separate option for launching server from within slurm allocation

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* apply black formatting

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* parse new run_benchmarks arguments as bool

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* parse 'within_slrum_alloc' arg as bool

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix default arguments

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

---------

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>
  • Loading branch information
jeremiah-corrado committed Sep 6, 2024
1 parent 8f4bd17 commit 906bdb2
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 19 deletions.
69 changes: 56 additions & 13 deletions benchmarks/run_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
import os
import subprocess
import sys
from server_util.test.server_test_util import get_arkouda_numlocales, start_arkouda_server,\
run_client, stop_arkouda_server
from server_util.test.server_test_util import (
get_arkouda_numlocales,
start_arkouda_server,
run_client,
stop_arkouda_server,
)

benchmark_dir = os.path.dirname(__file__)
util_dir = os.path.join(benchmark_dir, "..", "server_util", "test")
sys.path.insert(0, os.path.abspath(util_dir))
Expand Down Expand Up @@ -90,12 +95,13 @@ def add_to_dat(benchmark, output, dat_dir, graph_infra):
benchmark_out = "{}.exec.out.tmp".format(benchmark)
with open(benchmark_out, "w") as f:
f.write(output)
subprocess.check_output([computePerfStats, benchmark, dat_dir, perfkeys, benchmark_out])
subprocess.check_output(
[computePerfStats, benchmark, dat_dir, perfkeys, benchmark_out]
)
os.remove(benchmark_out)


def generate_graphs(args):

"""
Generate graphs using the existing .dat files and graph infrastructure.
"""
Expand Down Expand Up @@ -140,20 +146,31 @@ def create_parser():
default=get_arkouda_numlocales(),
help="Number of locales to use for the server",
)
parser.add_argument("-sp", "--server-port", default="5555", help="Port number to use for the server")
parser.add_argument("--server-args", action="append", help="Additional server arguments")
parser.add_argument("--numtrials", default=1, type=int, help="Number of trials to run")
parser.add_argument(
"benchmarks", nargs="*", help="Basename of benchmarks to run with extension stripped"
"-sp", "--server-port", default="5555", help="Port number to use for the server"
)
parser.add_argument(
"--server-args", action="append", help="Additional server arguments"
)
parser.add_argument(
"--numtrials", default=1, type=int, help="Number of trials to run"
)
parser.add_argument(
"benchmarks",
nargs="*",
help="Basename of benchmarks to run with extension stripped",
)
parser.add_argument(
"--save-data",
default=False,
action="store_true",
help="Save performance data to output files, requires $CHPL_HOME"
help="Save performance data to output files, requires $CHPL_HOME",
)
parser.add_argument(
"--gen-graphs", default=False, action="store_true", help="Generate graphs, requires $CHPL_HOME"
"--gen-graphs",
default=False,
action="store_true",
help="Generate graphs, requires $CHPL_HOME",
)
parser.add_argument(
"--dat-dir",
Expand All @@ -167,10 +184,22 @@ def create_parser():
help="Directory containing graph infrastructure",
)
parser.add_argument("--platform-name", default="", help="Test platform name")
parser.add_argument("--description", default="", help="Description of this configuration")
parser.add_argument(
"--description", default="", help="Description of this configuration"
)
parser.add_argument("--annotations", default="", help="File containing annotations")
parser.add_argument("--configs", help="comma seperate list of configurations")
parser.add_argument("--start-date", help="graph start date")
parser.add_argument(
"--isolated",
default=False,
help="run each benchmark in its own server instance",
)
parser.add_argument(
"--within-slrum-alloc",
default=False,
help="whether this script was launched from within a slurm allocation (for use with --isolated only)",
)
return parser


Expand All @@ -179,22 +208,36 @@ def main():
args, client_args = parser.parse_known_args()
args.graph_dir = args.graph_dir or os.path.join(args.dat_dir, "html")
config_dat_dir = os.path.join(args.dat_dir, args.description)
run_isolated = bool(args.isolated)

if args.save_data or args.gen_graphs:
os.makedirs(config_dat_dir, exist_ok=True)

start_arkouda_server(args.num_locales, port=args.server_port, server_args=args.server_args)
if not run_isolated:
start_arkouda_server(
args.num_locales, port=args.server_port, server_args=args.server_args
)

args.benchmarks = args.benchmarks or BENCHMARKS
for benchmark in args.benchmarks:
if run_isolated:
start_arkouda_server(
args.num_locales,
port=args.server_port,
server_args=args.server_args,
within_slurm_alloc=bool(args.within_slrum_alloc),
)
for trial in range(args.numtrials):
benchmark_py = os.path.join(benchmark_dir, "{}.py".format(benchmark))
out = run_client(benchmark_py, client_args)
if args.save_data or args.gen_graphs:
add_to_dat(benchmark, out, config_dat_dir, args.graph_infra)
print(out)
if run_isolated:
stop_arkouda_server()

stop_arkouda_server()
if not run_isolated:
stop_arkouda_server()

if args.save_data or args.gen_graphs:
comp_file = os.getenv("ARKOUDA_PRINT_PASSES_FILE", "")
Expand Down
65 changes: 59 additions & 6 deletions server_util/test/server_test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TestRunningMode(Enum):
"""
Enum indicating the running mode of the test harness
"""

CLIENT = "CLIENT"
CLASS_SERVER = "CLASS_SERVER"
GLOBAL_SERVER = "GLOBAL_SERVER"
Expand Down Expand Up @@ -117,11 +118,11 @@ def read_server_and_port_from_file(server_connection_info):
while True:
try:
with open(server_connection_info, "r") as f:
(hostname,port,connect_url) = f.readline().split(" ")
(hostname, port, connect_url) = f.readline().split(" ")
port = int(port)
if hostname == socket.gethostname():
hostname = "localhost"
return (hostname,port,connect_url)
return (hostname, port, connect_url)
except (ValueError, FileNotFoundError) as e:
time.sleep(1)
continue
Expand Down Expand Up @@ -181,7 +182,51 @@ def kill_server(server_process):
server_process.kill()


def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_args=None):
def get_server_launch_cmd(numlocales):
"""
Get an srun command to launch ./arkouda_server_real directly
"""
import re

# get the srun command for 'arkouda_server_real'
p = subprocess.Popen(
["./arkouda_server", f"-nl{numlocales}", "--dry-run"], stdout=subprocess.PIPE
)
srun_cmd, err = p.communicate()
srun_cmd = srun_cmd.decode()

if err is not None:
raise RuntimeError("failed to capture arkouda srun command: ", err)

# remove and capture the '--constraint=' argument if present
constraint_setting = None
m = re.search(r"--constraint=[\w,]*\s", srun_cmd)
if m is not None:
constraint_setting = srun_cmd[m.start() : m.end()]
srun_cmd = srun_cmd[: m.start()] + srun_cmd[m.end() + 1 :]

# extract evironment variable settings specified in the command
# and include them in the executing environment
env = os.environ.copy()
max_env_idx = 0
for match in re.finditer(r"([A-Z_]+)=(\S+)", srun_cmd):
max_env_idx = max(max_env_idx, match.end())
env.update({match.group(1): match.group(2)})

# remove the environment variables from the command string
srun_cmd = srun_cmd[max_env_idx:]

return (srun_cmd, env, constraint_setting)


def start_arkouda_server(
numlocales,
trace=False,
port=5555,
host=None,
server_args=None,
within_slurm_alloc=False,
):
"""
Start the Arkouda server and wait for it to start running. Connection info
is written to `get_arkouda_server_info_file()`.
Expand All @@ -191,15 +236,23 @@ def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_a
:param int port: the desired arkouda_server port, defaults to 5555
:param str host: the desired arkouda_server host, defaults to None
:param list server_args: additional arguments to pass to the server
:param within_slurm_alloc: whether the current script is running within a slurm allocation.
in which case, special care needs to be taken when launching the server.
:return: tuple containing server host, port, and process
:rtype: ServerInfo(host, port, process)
"""
connection_file = get_arkouda_server_info_file()
with contextlib.suppress(FileNotFoundError):
os.remove(connection_file)

cmd = [
get_arkouda_server(),
if within_slurm_alloc:
raw_server_cmd, env, _ = get_server_launch_cmd(numlocales)
raw_server_cmd = raw_server_cmd.strip().strip().split(" ")
else:
raw_server_cmd = [get_arkouda_server(),]
env = None

cmd = raw_server_cmd + [
"--trace={}".format("true" if trace else "false"),
"--serverConnectionInfo={}".format(connection_file),
"-nl {}".format(numlocales),
Expand All @@ -209,7 +262,7 @@ def start_arkouda_server(numlocales, trace=False, port=5555, host=None, server_a
cmd += server_args

logging.info('Starting "{}"'.format(cmd))
process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL)
process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, env=env)
atexit.register(kill_server, process)

if not host:
Expand Down

0 comments on commit 906bdb2

Please sign in to comment.