Skip to content

Commit

Permalink
Merge pull request #896 from glitch/887-derive-offsets
Browse files Browse the repository at this point in the history
Closes #887 : Implements an option to derive SegString offsets/segments array
  • Loading branch information
reuster986 authored Aug 20, 2021
2 parents 7d65bd7 + bbb73bd commit c9e214d
Show file tree
Hide file tree
Showing 13 changed files with 316 additions and 260 deletions.
40 changes: 29 additions & 11 deletions arkouda/pdarrayIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def ls_hdf(filename : str) -> str:

@typechecked
def read_hdf(dsetName : str, filenames : Union[str,List[str]],
strictTypes: bool=True) \
strictTypes: bool=True, allow_errors:bool = False, calc_string_offsets:bool = False) \
-> Union[pdarray, Strings]:
"""
Read a single dataset from multiple HDF5 files into an Arkouda
Expand All @@ -58,7 +58,15 @@ def read_hdf(dsetName : str, filenames : Union[str,List[str]],
precision and sign across different files. For example, if one
file contains a uint32 dataset and another contains an int64
dataset, the contents of both will be read into an int64 pdarray.
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
calc_string_offsets: bool
Default False, if True this will tell the server to calculate the
offsets/segments array on the server versus loading them from HDF5 files.
In the future this option may be set to True as the default.
Returns
-------
Union[pdarray,Strings]
Expand Down Expand Up @@ -96,15 +104,17 @@ def read_hdf(dsetName : str, filenames : Union[str,List[str]],
# return Strings(*rep_msg.split('+'))
# else:
# return create_pdarray(rep_msg)
return cast(Union[pdarray, Strings],
read_all(filenames, datasets=dsetName, strictTypes=strictTypes))
return cast(Union[pdarray, Strings],
read_all(filenames, datasets=dsetName, strictTypes=strictTypes, allow_errors=allow_errors,
calc_string_offsets=calc_string_offsets))


def read_all(filenames : Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
iterative: bool = False,
strictTypes: bool = True,
allow_errors: bool = False)\
allow_errors: bool = False,
calc_string_offsets = False)\
-> Union[pdarray, Strings, Mapping[str,Union[pdarray,Strings]]]:
"""
Read datasets from HDF5 files.
Expand All @@ -128,6 +138,10 @@ def read_all(filenames : Union[str, List[str]],
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
calc_string_offsets: bool
Default False, if True this will tell the server to calculate the
offsets/segments array on the server versus loading them from HDF5 files.
In the future this option may be set to True as the default.
Returns
-------
Expand Down Expand Up @@ -181,11 +195,12 @@ def read_all(filenames : Union[str, List[str]],
if len(nonexistent) > 0:
raise ValueError("Dataset(s) not found: {}".format(nonexistent))
if iterative == True: # iterative calls to server readhdf
return {dset:read_hdf(dset, filenames, strictTypes=strictTypes) for dset in datasets}
return {dset: read_hdf(dset, filenames, strictTypes=strictTypes, allow_errors=allow_errors,
calc_string_offsets=calc_string_offsets) for dset in datasets}
else: # single call to server readAllHdf
rep_msg = generic_msg(cmd="readAllHdf", args="{} {:n} {:n} {} {} | {}".
format(strictTypes, len(datasets), len(filenames), allow_errors, json.dumps(datasets),
json.dumps(filenames)))
rep_msg = generic_msg(cmd="readAllHdf", args=
f"{strictTypes} {len(datasets)} {len(filenames)} {allow_errors} {calc_string_offsets} {json.dumps(datasets)} | {json.dumps(filenames)}"
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllHdfMsgJson for json structure
items = rep["items"] if "items" in rep else []
file_errors = rep["file_errors"] if "file_errors" in rep else []
Expand Down Expand Up @@ -222,7 +237,7 @@ def read_all(filenames : Union[str, List[str]],


@typechecked
def load(path_prefix : str, dataset : str='array') -> Union[pdarray,Strings]:
def load(path_prefix : str, dataset : str='array', calc_string_offsets:bool = False) -> Union[pdarray,Strings]:
"""
Load a pdarray previously saved with ``pdarray.save()``.
Expand All @@ -232,6 +247,9 @@ def load(path_prefix : str, dataset : str='array') -> Union[pdarray,Strings]:
Filename prefix used to save the original pdarray
dataset : str
Dataset name where the pdarray was saved, defaults to 'array'
calc_string_offsets : bool
If True the server will ignore Segmented Strings 'offsets' array and derive
it from the null-byte terminators. Defaults to False currently
Returns
-------
Expand All @@ -257,7 +275,7 @@ def load(path_prefix : str, dataset : str='array') -> Union[pdarray,Strings]:
globstr = "{}_LOCALE*{}".format(prefix, extension)

try:
return read_hdf(dataset, globstr)
return read_hdf(dataset, globstr, calc_string_offsets=calc_string_offsets)
except RuntimeError as re:
if 'does not exist' in str(re):
raise ValueError('There are no files corresponding to the ' +
Expand Down
42 changes: 22 additions & 20 deletions arkouda/pdarraycreation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import numpy as np # type: ignore
import pandas as pd # type: ignore
from typing import cast, Iterable, Optional, Union
Expand Down Expand Up @@ -176,24 +177,20 @@ def array(a : Union[pdarray,np.ndarray, Iterable]) -> Union[pdarray, Strings]:
if a.ndim != 1:
raise RuntimeError("Only rank-1 pdarrays or ndarrays supported")
# Check if array of strings
if a.dtype.kind == 'U' or 'U' in a.dtype.kind:
encoded = np.array([elem.encode() for elem in a])
# Length of each string, plus null byte terminator
lengths = np.array([len(elem) for elem in encoded]) + 1
# Compute zero-up segment offsets
offsets = np.cumsum(lengths) - lengths
# Allocate and fill bytes array with string segments
nbytes = offsets[-1] + lengths[-1]
if 'U' in a.dtype.kind:
# encode each string and add a null byte terminator
encoded = [i for i in itertools.chain.from_iterable(map(lambda x: x.encode() + b"\x00", a))]
nbytes = len(encoded)
if nbytes > maxTransferBytes:
raise RuntimeError(("Creating pdarray would require transferring {} bytes," +
" which exceeds allowed transfer size. Increase " +
"ak.maxTransferBytes to force.").format(nbytes))
values = np.zeros(nbytes, dtype=np.uint8)
for s, o in zip(encoded, offsets):
for i, b in enumerate(s):
values[o+i] = b
# Recurse to create pdarrays for offsets and values, then return Strings object
return Strings(cast(pdarray, array(offsets)), cast(pdarray, array(values)))
encoded_np = np.array(encoded, dtype=np.uint8)
args = f"{encoded_np.dtype.name} {encoded_np.size} seg_string={True}"
rep_msg = generic_msg(cmd='array', args=args, payload=_array_memview(encoded_np), send_binary=True)
parts = cast(str, rep_msg).split('+', maxsplit=3)
return Strings(parts[0], parts[1])

# If not strings, then check that dtype is supported in arkouda
if a.dtype.name not in DTypes:
raise RuntimeError("Unhandled dtype {}".format(a.dtype))
Expand All @@ -206,14 +203,19 @@ def array(a : Union[pdarray,np.ndarray, Iterable]) -> Union[pdarray, Strings]:
# including the dtype and size. If the server has a different byteorder
# than our numpy array we need to swap to match since the server expects
# native endian bytes
aview = _array_memview(a)
args = f"{a.dtype.name} {size} seg_strings={False}"
rep_msg = generic_msg(cmd='array', args=args, payload=aview, send_binary=True)
return create_pdarray(rep_msg)


def _array_memview(a) -> memoryview:
if ((get_byteorder(a.dtype) == '<' and get_server_byteorder() == 'big') or
(get_byteorder(a.dtype) == '>' and get_server_byteorder() == 'little')):
aview = memoryview(a.byteswap())
(get_byteorder(a.dtype) == '>' and get_server_byteorder() == 'little')):
return memoryview(a.byteswap())
else:
aview = memoryview(a)
args = "{} {:n} ". format(a.dtype.name, size)
repMsg = generic_msg(cmd='array', args=args, payload=aview, send_binary=True)
return create_pdarray(repMsg)
return memoryview(a)


def zeros(size : int_scalars, dtype : type=np.float64) -> pdarray:
"""
Expand Down
16 changes: 10 additions & 6 deletions arkouda/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def to_ndarray(self) -> np.ndarray:

@typechecked
def save(self, prefix_path : str, dataset : str='strings_array',
mode : str='truncate') -> str:
mode : str='truncate', save_offsets : bool = True) -> str:
"""
Save the Strings object to HDF5. The result is a collection of HDF5 files,
one file per locale of the arkouda server, where each filename starts
Expand All @@ -806,6 +806,10 @@ def save(self, prefix_path : str, dataset : str='strings_array',
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', create a new Strings dataset within existing files.
save_offsets : bool
Defaults to True which will instruct the server to save the offsets array to HDF5
If False the offsets array will not be save and will be derived from the string values
upon load/read.
Returns
-------
Expand Down Expand Up @@ -841,11 +845,11 @@ def save(self, prefix_path : str, dataset : str='strings_array',
json_array = json.dumps([prefix_path])
except Exception as e:
raise ValueError(e)
return cast(str, generic_msg(cmd="tohdf", args="{} {} {} {} {} {}".\
format(self.bytes.name, dataset, m, json_array,
self.dtype, self.offsets.name)))

cmd = "tohdf"
args = f"{self.bytes.name} {dataset} {m} {json_array} {self.dtype} {self.offsets.name} {save_offsets}"
return cast(str, generic_msg(cmd, args))


def is_registered(self) -> np.bool_:
"""
Expand Down
Loading

0 comments on commit c9e214d

Please sign in to comment.