Skip to content

Commit

Permalink
array_tools: reusing memory
Browse files Browse the repository at this point in the history
  • Loading branch information
miili committed Mar 6, 2024
1 parent 384050f commit b2ce40a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 23 deletions.
72 changes: 65 additions & 7 deletions src/qseek/ext/array_tools.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
static PyObject *fill_zero_bytes(PyObject *module, PyObject *args,
PyObject *kwds) {
PyObject *array;

static char *kwlist[] = {"array", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwds, "O", kwlist, &array))
Expand All @@ -23,6 +24,63 @@ static PyObject *fill_zero_bytes(PyObject *module, PyObject *args,
Py_RETURN_NONE;
}

static PyObject *fill_zero_bytes_mask(PyObject *module, PyObject *args,
PyObject *kwds) {
PyObject *array, *mask;

PyArrayObject *mask_arr, *array_arr;
npy_bool *mask_data;
npy_intp *array_shape, n_nodes, n_samples;
size_t n_bytes;

static char *kwlist[] = {"array", "mask", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwds, "OO", kwlist, &array, &mask))
return NULL;

if (!PyArray_Check(array)) {
PyErr_SetString(PyExc_ValueError, "object is not a NumPy array");
return NULL;
}
array_arr = (PyArrayObject *)array;

if (PyArray_NDIM(array_arr) != 2) {
PyErr_SetString(PyExc_ValueError, "array is not a 2D NumPy array");
return NULL;
}

array_shape = PyArray_SHAPE((PyArrayObject *)array);
n_nodes = array_shape[0];
n_samples = array_shape[1];

if (!PyArray_Check(mask)) {
PyErr_SetString(PyExc_ValueError, "mask is not a NumPy array");
return NULL;
}
mask_arr = (PyArrayObject *)mask;

if (PyArray_NDIM(mask_arr) != 1) {
PyErr_SetString(PyExc_ValueError, "mask is not a 1D NumPy array");
return NULL;
}
if (PyArray_SIZE(mask_arr) != n_nodes) {
PyErr_SetString(PyExc_ValueError, "mask size does not match array");
return NULL;
}

mask_data = PyArray_DATA(mask_arr);
n_bytes = (size_t)n_samples * PyArray_ITEMSIZE(array_arr);
Py_BEGIN_ALLOW_THREADS;
for (int i_node = 0; i_node < PyArray_SIZE(mask_arr); i_node++) {
if (mask_data[i_node]) {
memset(PyArray_GETPTR2((PyArrayObject *)array, (npy_intp)i_node, 0), 0,
n_bytes);
}
}
Py_END_ALLOW_THREADS;
Py_RETURN_NONE;
}

static PyObject *apply_cache(PyObject *module, PyObject *args, PyObject *kwds) {
PyObject *obj, *cache, *mask;
PyArrayObject *array, *mask_array, *cached_row;
Expand All @@ -34,6 +92,7 @@ static PyObject *apply_cache(PyObject *module, PyObject *args, PyObject *kwds) {
npy_int *cumsum_mask, mask_value;
npy_int idx_sum = 0;
npy_bool *mask_data;
size_t n_bytes;

static char *kwlist[] = {"array", "cache", "mask", "nthreads", NULL};

Expand Down Expand Up @@ -124,6 +183,7 @@ static PyObject *apply_cache(PyObject *module, PyObject *args, PyObject *kwds) {
}
}

n_bytes = (size_t)n_samples * sizeof(npy_float32);
Py_BEGIN_ALLOW_THREADS;
#pragma omp parallel for num_threads(n_threads) \
schedule(dynamic) private(cached_row)
Expand All @@ -135,24 +195,22 @@ static PyObject *apply_cache(PyObject *module, PyObject *args, PyObject *kwds) {
cache, (Py_ssize_t)cumsum_mask[i_node]);
memcpy(
PyArray_GETPTR2((PyArrayObject *)array, (npy_intp)i_node, (npy_intp)0),
PyArray_DATA((PyArrayObject *)cached_row),
(size_t)n_samples * sizeof(npy_float32));
PyArray_DATA((PyArrayObject *)cached_row), n_bytes);
}
Py_END_ALLOW_THREADS;

Py_RETURN_NONE;
}

static PyMethodDef methods[] = {
/* The cast of the function is necessary since PyCFunction values
* only take two PyObject* parameters, and fill_zero_bytes() takes
* three.
*/
{"fill_zero_bytes", (PyCFunction)(void (*)(void))fill_zero_bytes,
METH_VARARGS | METH_KEYWORDS, "Fill a numpy array with zero bytes."},
{"fill_zero_bytes_mask", (PyCFunction)(void (*)(void))fill_zero_bytes_mask,
METH_VARARGS | METH_KEYWORDS,
"Fill a numpy 2D array with zero bytes on a row mask."},
{"apply_cache", (PyCFunction)(void (*)(void))apply_cache,
METH_VARARGS | METH_KEYWORDS,
"Apply a cache to a 2D numpy array of type float32."},
"Apply a row cache to a 2D numpy array of type float32."},
{NULL, NULL, 0, NULL} /* sentinel */
};

Expand Down
8 changes: 8 additions & 0 deletions src/qseek/ext/array_tools.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ def fill_zero_bytes(array: np.ndarray) -> None:
array: The array to fill with zero bytes.
"""

def fill_zero_bytes_mask(array: np.ndarray, mask: np.ndarray) -> None:
"""Fill the zero bytes of the array with zeros using numba.
Args:
array: The array to fill with zero bytes ndim=2, NxM.
mask: The mask array, ndim=1 with N shape of np.bool type.
"""

def apply_cache(
data: np.ndarray,
cache: list[np.ndarray],
Expand Down
43 changes: 27 additions & 16 deletions src/qseek/models/semblance.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from rich.table import Table
from scipy import signal

from qseek.ext.array_tools import apply_cache, fill_zero_bytes
from qseek.ext.array_tools import apply_cache, fill_zero_bytes, fill_zero_bytes_mask
from qseek.stats import Stats
from qseek.utils import datetime_now, get_cpu_count, human_readable_bytes

Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(
sampling_rate: float,
padding_samples: int = 0,
exponent: float = 1.0,
cache: dict[bytes, np.ndarray] | None = None,
) -> None:
self.sampling_rate = sampling_rate
self.padding_samples = padding_samples
Expand All @@ -105,27 +106,32 @@ def __init__(
self._node_hashes = [node.hash() for node in nodes]
n_nodes = len(self._node_hashes)

if self._cached_semblance is not None and self._cached_semblance.shape == (
n_nodes,
n_samples,
n_values = n_nodes * n_samples

if (
self._cached_semblance is not None
and self._cached_semblance.size >= n_values
):
logger.debug("recycling semblance memory")
fill_zero_bytes(self._cached_semblance)
self.semblance_unpadded = self._cached_semblance
logger.debug("recycling semblance memory with paged NUMA memory")
self.semblance_unpadded = self._cached_semblance[:n_values].reshape(
(n_nodes, n_samples)
)
if cache:
# If a cache is supplied only zero the missing nodes
fill_zero_bytes_mask(
self.semblance_unpadded, ~self.get_cache_mask(cache)
)
else:
fill_zero_bytes(self.semblance_unpadded)
else:
logger.debug("re-allocating semblance memory")
logger.info(
"re-allocating semblance memory: %d", human_readable_bytes(n_values * 4)
)
self.semblance_unpadded = np.zeros((n_nodes, n_samples), dtype=np.float32)
Semblance._cached_semblance = self.semblance_unpadded

self._stats.semblance_size_bytes = self.semblance_unpadded.nbytes

logger.debug(
"allocated volume for %d nodes and %d samples (%s)",
n_nodes,
n_samples,
human_readable_bytes(self.semblance_unpadded.nbytes),
)

@property
def n_nodes(self) -> int:
"""Number of nodes."""
Expand All @@ -151,8 +157,13 @@ def start_time(self) -> datetime:
)

def get_cache(self) -> dict[bytes, np.ndarray]:
"""Return a cache dictionary containing the semblance data.
We make a copy to keep the original data paged in memory.
"""
cached_semblance = self.semblance_unpadded.copy()
return {
node_hash: self.semblance_unpadded[i, :]
node_hash: cached_semblance[i, :]
for i, node_hash in enumerate(self._node_hashes)
}

Expand Down
11 changes: 11 additions & 0 deletions test/test_array_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,14 @@ def test_fill_zero_bytes() -> None:

array_tools.fill_zero_bytes(data)
np.testing.assert_array_equal(data, zeros)


def test_fill_zero_bytes_mask():
nnodes = 1000
nsamples = 3000
data = np.ones(shape=(nnodes, nsamples), dtype=np.float32)
mask = np.random.choice([0, 1], size=nnodes, p=[0.5, 0.5]).astype(np.bool_)

array_tools.fill_zero_bytes_mask(data, mask)
for idx, m in enumerate(mask):
assert np.any(data[idx]) != m

0 comments on commit b2ce40a

Please sign in to comment.