Skip to content

Commit

Permalink
Closes #2254 - PyTest benchmark upgrade for IO benchmarks (#2255)
Browse files Browse the repository at this point in the history
* IO benchmark pytest upgrade

* Adding IO Benchmark enhancements

* Updates based on feedback

* Updating beased on feedback

* Adding append file delete function and using io.to_parquet

---------

Co-authored-by: joshmarshall1 <josh@MSI.localdomain>
  • Loading branch information
joshmarshall1 and joshmarshall1 authored Mar 24, 2023
1 parent be54ed8 commit 424e162
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
1 change: 1 addition & 0 deletions benchmark.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ testpaths =
benchmark_v2/scan_benchmark.py
benchmark_v2/substring_search_benchmark.py
benchmark_v2/no_op_benchmark.py
benchmark_v2/io_benchmark.py
benchmark_v2/sort_cases_benchmark.py
python_functions = bench_*
env =
Expand Down
10 changes: 10 additions & 0 deletions benchmark_v2/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

default_dtype = ["int64", "uint64", "float64", "bool", "str", "bigint", "mixed"]
default_encoding = ["ascii", "idna"]
default_compression = [None, "snappy", "gzip", "brotli", "zstd", "lz4"]


def pytest_configure(config):
Expand All @@ -30,6 +31,15 @@ def pytest_configure(config):
pytest.idx_size = None if config.getoption("index_size") == "" else eval(config.getoption("index_size"))
pytest.val_size = None if config.getoption("value_size") == "" else eval(config.getoption("value_size"))

# IO settings
comp_str = config.getoption("io_compression")
pytest.io_compression = default_compression if comp_str == "" else comp_str.split(",")
pytest.io_delete = config.getoption("io_only_delete")
pytest.io_files = eval(config.getoption("io_files_per_loc"))
pytest.io_path = config.getoption("io_path")
pytest.io_read = config.getoption("io_only_read")
pytest.io_write = config.getoption("io_only_write")


@pytest.fixture(scope="module", autouse=True)
def startup_teardown():
Expand Down
270 changes: 270 additions & 0 deletions benchmark_v2/io_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import os
from glob import glob

import pytest

import arkouda as ak
from arkouda.io import to_parquet

TYPES = ("int64", "float64", "uint64", "str")
FILETYPES = ("HDF5", "PARQUET")
COMPRESSIONS = (None, "snappy", "gzip", "brotli", "zstd", "lz4")


def _write_files(a, ftype, dtype, compression=None):
for i in range(pytest.io_files):
a.to_hdf(f"{pytest.io_path}_hdf_{dtype}_{i:04}") if ftype == "HDF5" else to_parquet(
[a],
f"{pytest.io_path}_par_{compression}_{dtype}_{i:04}",
compression=compression,
)


def _write_multi(a, dtype, compression=None):
data = a._prep_data()
for i in range(pytest.io_files):
to_parquet(
data,
f"{pytest.io_path}_par_multi_{compression}_{dtype}_{i:04}",
compression=compression,
)


def _append_files(a, dtype, compression):
_remove_append_test_files(compression, dtype)
for i in range(pytest.io_files):
for key in a:
val = a[key]
to_parquet(
[val],
f"{pytest.io_path}_par_multi_{compression}_{dtype}_app_{i:04}",
names=[key],
mode="append",
compression=compression,
)


def _generate_array(N, dtype):
if dtype == "int64":
return ak.randint(0, 2**32, N, seed=pytest.seed)
elif dtype == "float64":
return ak.randint(0, 1, N, dtype=ak.float64, seed=pytest.seed)
elif dtype == "uint64":
return ak.randint(0, 2**32, N, dtype=ak.uint64, seed=pytest.seed)
elif dtype == "str":
return ak.random_strings_uniform(1, 16, N, seed=pytest.seed)


def _generate_df(N, dtype, returnDict=False):
df_dict = {
"c_1": _generate_array(N, dtype),
"c_2": _generate_array(N, dtype),
"c_3": _generate_array(N, dtype),
"c_4": _generate_array(N, dtype),
"c_5": _generate_array(N, dtype),
}
return df_dict if returnDict else ak.DataFrame(df_dict)


@pytest.mark.benchmark(group="Arkouda_IO_Write_HDF5")
@pytest.mark.parametrize("dtype", TYPES)
def bench_ak_write_hdf(benchmark, dtype):
if pytest.io_write or (not pytest.io_read and not pytest.io_delete) and dtype in pytest.dtype:
cfg = ak.get_config()
N = pytest.prob_size * cfg["numLocales"]

a = _generate_array(N, dtype)

benchmark.pedantic(_write_files, args=(a, "HDF5", dtype), rounds=pytest.trials)

if dtype in ["int64", "float64", "uint64"]:
nbytes = a.size * a.itemsize * pytest.io_files
else:
nbytes = a.nbytes * a.entry.itemsize * pytest.io_files

benchmark.extra_info[
"description"
] = f"Measures the performance of IO write {dtype} to HDF5 file"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Write_Parquet")
@pytest.mark.parametrize("dtype", TYPES)
@pytest.mark.parametrize("comp", COMPRESSIONS)
def bench_ak_write_parquet(benchmark, dtype, comp):
if pytest.io_write or (not pytest.io_read and not pytest.io_delete) \
and dtype in pytest.dtype and comp in pytest.io_compression:
cfg = ak.get_config()
N = pytest.prob_size * cfg["numLocales"]

a = _generate_array(N, dtype)

benchmark.pedantic(_write_files, args=(a, "PARQUET", dtype, comp), rounds=pytest.trials)

if dtype in ["int64", "float64", "uint64"]:
nbytes = a.size * a.itemsize * pytest.io_files
else:
nbytes = a.nbytes * a.entry.itemsize * pytest.io_files

benchmark.extra_info[
"description"
] = f"Measures the performance of IO write {dtype} to Parquet file using {comp} compression"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Write_Parquet")
@pytest.mark.parametrize("dtype", TYPES)
@pytest.mark.parametrize("comp", COMPRESSIONS)
def bench_ak_write_parquet_multi(benchmark, dtype, comp):
if pytest.io_write or (not pytest.io_read and not pytest.io_delete) \
and dtype in pytest.dtype and comp in pytest.io_compression:
cfg = ak.get_config()
N = pytest.prob_size * cfg["numLocales"]

a = _generate_df(N, dtype)

benchmark.pedantic(_write_multi, args=(a, dtype, comp), rounds=pytest.trials)

nbytes = 0
for c in a.columns:
col = a[c]
if dtype in ["int64", "float64", "uint64"]:
nbytes = col.size * col.itemsize * pytest.io_files
else:
nbytes = col.nbytes * col.entry.itemsize * pytest.io_files

benchmark.extra_info[
"description"
] = f"Measures the performance of IO write {dtype} to Parquet file using {comp} compression"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Write_Parquet")
@pytest.mark.parametrize("dtype", TYPES)
@pytest.mark.parametrize("comp", COMPRESSIONS)
def bench_ak_write_parquet_append(benchmark, dtype, comp):
if pytest.io_write or (not pytest.io_read and not pytest.io_delete) \
and dtype in pytest.dtype and comp in pytest.io_compression:
cfg = ak.get_config()
N = pytest.prob_size * cfg["numLocales"]

a = _generate_df(N, dtype, True)

benchmark.pedantic(_append_files, args=(a, dtype, comp), rounds=pytest.trials)

nbytes = 0
for col in a.values():
if dtype in ["int64", "float64", "uint64"]:
nbytes = col.size * col.itemsize * pytest.io_files
else:
nbytes = col.nbytes * col.entry.itemsize * pytest.io_files

benchmark.extra_info[
"description"
] = f"Measures the performance of IO write {dtype} to Parquet file using {comp} compression"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Read_HDF5")
@pytest.mark.parametrize("dtype", TYPES)
def bench_ak_read_hdf(benchmark, dtype):
if pytest.io_read or (not pytest.io_write and not pytest.io_delete) and dtype in pytest.dtype:
dataset = "strings_array" if dtype == "str" else "array"
a = benchmark.pedantic(
ak.read_hdf, args=[pytest.io_path + f"_hdf_{dtype}*", dataset], rounds=pytest.trials
)

nbytes = 0
if isinstance(a, ak.pdarray):
nbytes += a.size * a.itemsize
elif isinstance(a, ak.Strings):
nbytes += a.nbytes * a.entry.itemsize

benchmark.extra_info["description"] = "Measures the performance of IO read from HDF5 files"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Read_Parquet")
@pytest.mark.parametrize("dtype", TYPES)
@pytest.mark.parametrize("comp", COMPRESSIONS)
def bench_ak_read_parquet(benchmark, dtype, comp):
if pytest.io_read or (not pytest.io_write and not pytest.io_delete) \
and comp in pytest.io_compression and dtype in pytest.dtype:
a = benchmark.pedantic(
ak.read_parquet, args=[pytest.io_path + f"_par_{comp}_{dtype}_*"], rounds=pytest.trials
)

nbytes = 0
if isinstance(a, ak.pdarray):
nbytes += a.size * a.itemsize
elif isinstance(a, ak.Strings):
nbytes += a.nbytes * a.entry.itemsize

benchmark.extra_info["description"] = "Measures the performance of IO read from Parquet files"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Read_Parquet")
@pytest.mark.parametrize("dtype", TYPES)
@pytest.mark.parametrize("comp", COMPRESSIONS)
def bench_ak_read_parquet_multi_column(benchmark, dtype, comp):
"""
Read files written by parquet multicolumn and parquet append modes
"""
if pytest.io_read or (not pytest.io_write and not pytest.io_delete) \
and comp in pytest.io_compression and dtype in pytest.dtype:
a = benchmark.pedantic(
ak.read_parquet, args=[pytest.io_path + f"_par_multi_{comp}_{dtype}_*"], rounds=pytest.trials
)

nbytes = 0
for col in a:
if isinstance(col, ak.pdarray):
nbytes += col.size * col.itemsize
elif isinstance(col, ak.Strings):
nbytes += col.nbytes * col.entry.itemsize

benchmark.extra_info["description"] = "Measures the performance of IO read from Parquet files"
benchmark.extra_info["problem_size"] = pytest.prob_size
benchmark.extra_info["transfer_rate"] = "{:.4f} GiB/sec".format(
(nbytes / benchmark.stats["mean"]) / 2**30
)


@pytest.mark.benchmark(group="Arkouda_IO_Delete")
def bench_ak_delete(benchmark):
if pytest.io_delete or (not pytest.io_write and not pytest.io_read):
benchmark.pedantic(_remove_files, rounds=1)

cfg = ak.get_config()
benchmark.extra_info["description"] = "Measures the performance of IO delete files from system"
benchmark.extra_info["problem_size"] = pytest.io_files * cfg["numLocales"]
benchmark.extra_info["transfer_rate"] = "N/A"


def _remove_files():
for f in glob(pytest.io_path + "*"):
os.remove(f)


def _remove_append_test_files(compression, dtype):
for f in glob(f"{pytest.io_path}_par_multi_{compression}_{dtype}_app_" + "*"):
os.remove(f)
25 changes: 25 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ def pytest_addoption(parser):
"Comma separated list (NO SPACES) allowing for multiple"
"Encoding to be used. Accepted values: idna, ascii"
)
parser.addoption(
"--io_only_write", action="store_true", default=False,
help="Benchmark only option. Only write the files; files will not be removed"
)
parser.addoption(
"--io_only_read", action="store_true", default=False,
help="Benchmark only option. Only read the files; files will not be removed"
)
parser.addoption(
"--io_only_delete", action="store_true", default=False,
help="Benchmark only option. Only delete files created from writing with this benchmark"
)
parser.addoption(
"--io_files_per_loc", action="store", default="1",
help="Benchmark only option. Number of files to create per locale"
)
parser.addoption(
"--io_compression", action="store", default="",
help="Benchmark only option. Compression types to run IO benchmarks against. Comma delimited list"
"(NO SPACES) allowing for multiple. Accepted values: none, snappy, gzip, brotli, zstd, and lz4"
)
parser.addoption(
"--io_path", action="store", default=os.path.join(os.getcwd(), "ak_io_benchmark"),
help="Benchmark only option. Target path for measuring read/write rates",
)


def pytest_collection_modifyitems(config, items):
Expand Down

0 comments on commit 424e162

Please sign in to comment.