-
Notifications
You must be signed in to change notification settings - Fork 1
/
10_benchmark_perf.py
287 lines (250 loc) · 8.45 KB
/
10_benchmark_perf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
Benchmark script using hydra.
Run it with python 10_benchmark.py --config-name ismrm2024 to reproduce results of abstract.
This script benchmarks different MRI reconstruction algorithms using various 3D trajectories.
It initializes data, sets up the necessary configurations, and runs benchmarks while monitoring performance.
Usage:
python 10_benchmark.py --config-name config_name
Output:
Performance metrics and results saved in CSV files.
"""
import csv
import logging
import os
import time
import warnings
from pathlib import Path
import hydra
import numpy as np
from hydra_callbacks.logger import PerfLogger
from hydra_callbacks.monitor import ResourceMonitorService
from mrinufft import get_operator
from mrinufft.io import read_trajectory
from omegaconf import DictConfig
AnyShape = tuple[int, int, int]
# Check for CUPY availability for GPU support
CUPY_AVAILABLE = True
try:
import cupy as cp
except ImportError:
CUPY_AVAILABLE = False
# Initialize logger
logger = logging.getLogger(__name__)
# Suppress specific warnings from mrinufft module
warnings.filterwarnings(
"ignore",
"Samples will be rescaled to .*",
category=UserWarning,
module="mrinufft",
)
def get_data(cfg):
"""Initialize all the data for the benchmark."""
# Initialize trajectory
if cfg.trajectory.endswith(".bin"):
trajectory, params = read_trajectory(
str(Path(__file__).parent / cfg.trajectory)
)
else:
eval(trajectory.name)(**trajectory.kwargs)
cpx_type = np.dtype(cfg.data.dtype)
if cpx_type == np.complex64:
trajectory = trajectory.astype(np.float32)
C = cfg.data.n_coils
XYZ = tuple(params["img_size"])
K = np.prod(trajectory.shape[:-1])
# Load or generate data
if data_file := getattr(cfg.data, "file", None):
data = np.load(data_file)
if data.shape != XYZ:
logger.warning("mismatched shape between data and trajectory file.")
else:
data = (1j * np.random.rand(*XYZ)).astype(cpx_type)
data += np.random.rand(*XYZ).astype(cpx_type)
# Generate k-space data
ksp_data = 1j * np.random.randn(C, K).astype(cpx_type)
ksp_data += np.random.randn(C, K).astype(cpx_type)
# Initialize sensitivity maps
smaps = None
if cfg.data.n_coils > 1:
smaps_true = get_smaps(XYZ, C)
if cfg.data.smaps:
smaps = smaps_true
else:
# Expand the data to multicoil
data = data[None, ...] * smaps_true
return (data, ksp_data, trajectory, smaps, XYZ, C)
@hydra.main(
config_path="perf",
config_name="benchmark_config",
version_base=None,
)
def main_app(cfg: DictConfig) -> None:
"""Run the benchmark."""
# TODO Add a DSL like bart::extra_args:value::extra_arg2:value2 etc
# Initialize the NUFFT operator
nufftKlass = get_operator(cfg.backend.name)
data, ksp_data, trajectory, smaps, shape, n_coils = get_data(cfg)
logger.debug(
f"{data.shape}, {ksp_data.shape}, {trajectory.shape}, {n_coils}, {shape}"
)
# Set up resource monitoring
monit = ResourceMonitorService(
interval=cfg.monitor.interval, gpu_monit=cfg.monitor.gpu
)
kwargs = {}
if "stacked" in cfg.backend.name:
kwargs["z_index"] = "auto"
nufft = nufftKlass(
trajectory,
shape,
n_coils=n_coils,
smaps=smaps,
**kwargs,
)
run_config = {
"backend": cfg.backend.name,
"eps": cfg.backend.eps,
"upsampfac": cfg.backend.upsampfac,
"n_coils": nufft.n_coils,
"shape": nufft.shape,
"n_samples": nufft.n_samples,
"dim": len(nufft.shape),
"sense": nufft.uses_sense,
}
trajectory_name = cfg.trajectory.split("/")[-1].split("_")[0]
result_file = f"{cfg.backend.name}_{cfg.backend.upsampfac}_{trajectory_name}_{cfg.backend.eps}_{cfg.data.n_coils}.csv"
# Run benchmark tasks
for task in cfg.task:
tic = time.perf_counter()
toc = tic
i = -1
while toc - tic < cfg.max_time:
i += 1
nufft = nufftKlass(
trajectory,
shape,
n_coils=n_coils,
smaps=smaps,
eps=cfg.backend.eps,
upsampfac=cfg.backend.upsampfac,
)
with (
monit,
PerfLogger(logger, name=f"{cfg.backend.name}_{task}, #{i}") as perflog,
):
if task == "forward":
nufft.op(data)
elif task == "adjoint":
nufft.adj_op(ksp_data)
elif task == "grad":
nufft.data_consistency(data, ksp_data)
else:
raise ValueError(f"Unknown task {task}")
toc = time.perf_counter()
values = monit.get_values()
monit_values = {
"task": task,
"run": i,
"run_time": perflog.get_timer(f"{cfg.backend.name}_{task}, #{i}"),
"mem_avg": np.mean(values["rss_GiB"]),
"mem_peak": np.max(values["rss_GiB"]),
"cpu_avg": np.mean(values["cpus"]),
"cpu_peak": np.max(values["cpus"]),
}
if cfg.monitor.gpu:
gpu_keys = [k for k in values.keys() if "gpu" in k]
for k in gpu_keys:
monit_values[f"{k}_avg"] = np.mean(values[k])
monit_values[f"{k}_peak"] = np.max(values[k])
# Save benchmark results to CSV file
with open(result_file, "a") as f:
row_dict = run_config | monit_values
writer = csv.DictWriter(f, fieldnames=row_dict.keys())
f.seek(0, os.SEEK_END)
if not f.tell():
writer.writeheader()
writer.writerow(row_dict)
del nufft
if CUPY_AVAILABLE:
cp.get_default_memory_pool().free_all_blocks()
def get_smaps(
shape: AnyShape,
n_coils: int,
antenna: str = "birdcage",
dtype: np.dtype = np.complex64,
cachedir="/tmp/smaps",
) -> np.ndarray:
"""Get sensitivity maps for a specific antenna.
Parameters
----------
shape
Volume shape
n_coils
number of coil in the antenna
antenna
name of the antenna to emulate. Only "birdcage" is currently supported.
dtype
return datatype for the sensitivity maps.
cachedir : str
Directory to cache the sensitivity maps.
Returns
-------
np.ndarray
Complex sensitivity profiles.
"""
if antenna == "birdcage":
try:
os.makedirs(cachedir, exist_ok=True)
smaps = np.load(f"{cachedir}/smaps_{n_coils}_{shape}.npy")
except FileNotFoundError:
smaps = _birdcage_maps((n_coils, *shape), nzz=n_coils, dtype=dtype)
np.save(f"{cachedir}/smaps_{n_coils}_{shape}.npy", smaps)
return smaps
else:
raise NotImplementedError
def _birdcage_maps(
shape: AnyShape, r: float = 1.5, nzz: int = 8, dtype: np.dtype = np.complex64
) -> np.ndarray:
"""Simulate birdcage coil sensitivies.
Parameters
----------
shape : tuple
Sensitivity maps shape (nc, x, y, z).
r : float
Relative radius of birdcage.
nzz : int
Number of coils per ring.
dtype : np.dtype
Data type of the output.
Returns
-------
np.ndarray
Complex sensitivity profiles.
References
----------
https://sigpy.readthedocs.io/en/latest/_modules/sigpy/mri/sim.html
"""
if len(shape) == 4:
nc, nz, ny, nx = shape
elif len(shape) == 3:
nc, ny, nx = shape
nz = 1
else:
raise ValueError("shape must be [nc, nx, ny, nz] or [nc, nx, ny]")
c, z, y, x = np.mgrid[:nc, :nz, :ny, :nx]
coilx = r * np.cos(c * (2 * np.pi / nzz), dtype=np.float32)
coily = r * np.sin(c * (2 * np.pi / nzz), dtype=np.float32)
coilz = np.floor(np.float32(c / nzz)) - 0.5 * (np.ceil(nc / nzz) - 1)
coil_phs = np.float32(-(c + np.floor(c / nzz)) * (2 * np.pi / nzz))
x_co = (x - nx / 2.0) / (nx / 2.0) - coilx
y_co = (y - ny / 2.0) / (ny / 2.0) - coily
z_co = (z - nz / 2.0) / (nz / 2.0) - coilz
rr = (x_co**2 + y_co**2 + z_co**2) ** 0.5
phi = np.arctan2(x_co, -y_co) + coil_phs
out = (1 / rr) * np.exp(1j * phi)
rss = sum(abs(out) ** 2, 0) ** 0.5
out /= rss
out = np.squeeze(out)
return out.astype(dtype)
if __name__ == "__main__":
main_app()