Skip to content

Commit

Permalink
Implement Memory-mapped MLModel
Browse files Browse the repository at this point in the history
  • Loading branch information
Wei-Cheng Chang committed Jan 8, 2024
1 parent 22f01be commit 5c06b9e
Show file tree
Hide file tree
Showing 5 changed files with 568 additions and 71 deletions.
259 changes: 259 additions & 0 deletions pecos/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def __init__(self, dirname, soname, forced_rebuild=False):
self.clib_float32 = corelib.load_dynamic_library(
dirname, soname + "_float32", forced_rebuild=forced_rebuild
)
self.link_mlmodel_methods()
self.link_xlinear_methods()
self.link_sparse_operations()
self.link_clustering()
Expand All @@ -537,6 +538,264 @@ def __init__(self, dirname, soname, forced_rebuild=False):
self.link_mmap_valstore_methods()
self.link_calibrator_methods()

def link_mlmodel_methods(self):
"""
Specify C-lib's MLModel methods argument and return type.
"""
# compile mmap model
arg_list = [c_char_p, c_char_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_compile_mmap_model, None, arg_list)
# load mmap model
res_list = c_void_p
arg_list = [c_char_p, c_bool]
corelib.fillprototype(self.clib_float32.c_mlmodel_load_mmap_model, res_list, arg_list)
# destruct mmap model
arg_list = [c_void_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_destruct_model, None, arg_list)
# get in attr (nr_labels, nr_codes, nr_features)
res_list = c_uint32
arg_list = [c_void_p, c_char_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_get_int_attr, res_list, arg_list)

# Interface of sparse prediction
arg_list = [
c_void_p,
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_uint32,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(self.clib_float32.c_mlmodel_predict_csr_f32, None, arg_list)
# Interface of dense prediction
arg_list = [
c_void_p,
POINTER(ScipyDrmF32),
POINTER(ScipyCsrF32),
c_char_p,
c_uint32,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(self.clib_float32.c_mlmodel_predict_drm_f32, None, arg_list)

# Interface of sparse prediction for selected outputs
arg_list = [
c_void_p,
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(
self.clib_float32.c_mlmodel_predict_on_selected_outputs_csr_f32, None, arg_list
)
# Interface of dense prediction for selected outputs
arg_list = [
c_void_p,
POINTER(ScipyDrmF32),
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(
self.clib_float32.c_mlmodel_predict_on_selected_outputs_drm_f32, None, arg_list
)

def mlmodel_compile_mmap_model(self, npz_folder, mmap_folder):
"""
Compile MLModel from npz format to memory-mapped format
for faster loading.
Args:
npz_folder (str): The source folder path for mlmodel npz model.
mmap_folder (str): The destination folder path for mlmodel mmap model.
"""
self.clib_float32.c_mlmodel_compile_mmap_model(
c_char_p(npz_folder.encode("utf-8")), c_char_p(mmap_folder.encode("utf-8"))
)

def mlmodel_load_mmap(self, folder, lazy_load=False):
"""
Load MLModel in read-only mmap mode for prediction.
Args:
folder (str): The mmap folder path for mlmodel.
lazy_load (bool): Whether to lazy-load, i.e. load when needed(True)
or fully load model before returning(False).
Return:
cmodel (ptr): The pointer to mlmodel.
"""
cmodel = self.clib_float32.c_mlmodel_load_mmap_model(
c_char_p(folder.encode("utf-8")), c_bool(lazy_load)
)
return cmodel

def mlmodel_destruct_model(self, c_model):
"""
Destruct mlmodel.
Args:
cmodel (ptr): The pointer to xlinear model.
"""
self.clib_float32.mlmodel_destruct_model(c_model)

def mlmodel_get_int_attr(self, c_model, attr):
"""
Get int attribute from C mlmodel.
Args:
c_model (ptr): The C mlmodel pointer.
attr (str): The attribute name to get.
Return:
int_attr (int): The int attribute under given name.
"""
assert attr in {
"nr_labels",
"nr_codes",
"nr_features",
}, f"attr {attr} not implemented"
return self.clib_float32.c_mlmodel_get_int_attr(c_model, c_char_p(attr.encode("utf-8")))

def mlmodel_predict(
self,
c_model,
X,
csr_codes,
overriden_post_processor_str,
overriden_only_topk,
threads,
pred_alloc,
):
"""
Performs a full prediction using the given model and queries.
Args:
c_model (c_pointer): A C pointer to the model to use for prediction.
This pointer is returned by the c_mlmodel_load_mmap_model in corelib.clib_float32.
X: The query matrix (admissible formats are smat.csr_matrix,
np.ndarray, ScipyCsrF32, or ScipyDrmF32). Note that if this is smat.csr_matrix,
the matrix must have sorted indices. You can call sort_indices() to ensure this.
csr_codes (smat.csr_matrix or ScipyCsrF32): The prediction for the previous layer.
None if this is the first layer.
overriden_post_processor_str (string): Overrides the post processor to use by name. Use
None for model defaults.
overriden_only_topk (uint): Overrides the number of results to return for each query. Use
None for model defaults.
threads (int): Sets the number of threads to use in computation. Use
-1 to use the maximum amount of available threads.
pred_alloc (ScipyCompressedSparseAllocator): The allocator to store the result in.
"""
clib = self.clib_float32

if isinstance(X, smat.csr_matrix):
if not X.has_sorted_indices:
raise ValueError("Query matrix does not have sorted indices!")
X = ScipyCsrF32.init_from(X)
elif isinstance(X, np.ndarray):
X = ScipyDrmF32.init_from(X)

if isinstance(X, ScipyCsrF32):
c_predict = clib.c_mlmodel_predict_csr_f32
elif isinstance(X, ScipyDrmF32):
c_predict = clib.c_mlmodel_predict_drm_f32
else:
raise NotImplementedError("type(X) = {} not implemented".format(type(X)))

if csr_codes is not None:
# Check that the csr_code is of valid shape
nr_codes = clib.c_mlmodel_get_int_attr(c_model, c_char_p("nr_codes".encode("utf-8")))
if csr_codes.shape[0] != X.shape[0]:
raise ValueError("Instance dimension of query and csr_codes matrix do not match")
if csr_codes.shape[1] != nr_codes:
raise ValueError("Label dimension of csr_codes and C matrix do not match")
csr_codes = ScipyCsrF32.init_from(csr_codes)

c_predict(
c_model,
byref(X),
byref(csr_codes) if csr_codes is not None else None,
overriden_post_processor_str.encode("utf-8") if overriden_post_processor_str else None,
overriden_only_topk if overriden_only_topk else 0,
threads,
pred_alloc.cfunc,
)

def mlmodel_predict_on_selected_outputs(
self,
c_model,
X,
selected_outputs_csr,
csr_codes,
overriden_post_processor_str,
threads,
pred_alloc,
):
"""
Performs a select prediction using the given model and queries.
Args:
c_model (c_pointer): A C pointer to the model to use for prediction.
This pointer is returned by the c_mlmodel_load_mmap_model in corelib.clib_float32.
X: The query matrix (admissible formats are smat.csr_matrix,
np.ndarray, ScipyCsrF32, or ScipyDrmF32). Note that if this is smat.csr_matrix,
the matrix must have sorted indices. You can call sort_indices() to ensure this.
selected_outputs_csr (csr_matrix): the selected outputs to predict
csr_codes (smat.csr_matrix or ScipyCsrF32): The prediction for the previous layer.
None if this is the first layer.
overriden_post_processor_str (string): Overrides the post processor to use by name. Use
None for model defaults.
threads (int): Sets the number of threads to use in computation. Use
-1 to use the maximum amount of available threads.
pred_alloc (ScipyCompressedSparseAllocator): The allocator to store the result in.
"""
clib = self.clib_float32

if isinstance(X, smat.csr_matrix):
if not X.has_sorted_indices:
raise ValueError("Query matrix does not have sorted indices!")
X = ScipyCsrF32.init_from(X)
elif isinstance(X, np.ndarray):
X = ScipyDrmF32.init_from(X)

if not isinstance(selected_outputs_csr, smat.csr_matrix):
raise ValueError(
"type(selected_outputs_csr) = {} not implemented".format(type(selected_outputs_csr))
)
selected_outputs_csr = ScipyCsrF32.init_from(selected_outputs_csr)

if isinstance(X, ScipyCsrF32):
c_predict = clib.c_mlmodel_predict_on_selected_outputs_csr_f32
elif isinstance(X, ScipyDrmF32):
c_predict = clib.c_mlmodel_predict_on_selected_outputs_drm_f32
else:
raise NotImplementedError("type(X) = {} not implemented".format(type(X)))

if csr_codes is not None:
# Check that the csr_code is of valid shape
nr_codes = clib.c_mlmodel_get_int_attr(c_model, c_char_p("nr_codes".encode("utf-8")))
if csr_codes.shape[0] != X.shape[0]:
raise ValueError("Instance dimension of query and csr_codes matrix do not match")
if csr_codes.shape[1] != nr_codes:
raise ValueError("Label dimension of csr_codes and C matrix do not match")
csr_codes = ScipyCsrF32.init_from(csr_codes)

c_predict(
c_model,
byref(X),
byref(selected_outputs_csr),
byref(csr_codes) if csr_codes is not None else None,
overriden_post_processor_str.encode("utf-8") if overriden_post_processor_str else None,
threads,
pred_alloc.cfunc,
)

def link_xlinear_methods(self):
"""
Specify C-lib's Xlinear methods argument and return type.
Expand Down
89 changes: 88 additions & 1 deletion pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,93 @@
// C Interface of Types/Structures can be found in utils/matrix.hpp

extern "C" {
// ==== C Interface of XMC Models ====
// ==== C Interface of MLModels ====
// Only implemented for w_matrix_t = pecos::csc_t
//typedef pecos::bin_search_chunked_matrix_t MLMODEL_MAT_T;
typedef pecos::csc_t MLMODEL_MAT_T;
void c_mlmodel_compile_mmap_model(const char* model_path, const char* mmap_model_path) {
auto model = new pecos::MLModel<MLMODEL_MAT_T>(model_path, 0);
model->save_mmap(mmap_model_path);
delete model;
}
void* c_mlmodel_load_mmap_model(const char* model_path, const bool lazy_load) {
auto mlm = new pecos::MLModel<MLMODEL_MAT_T>(model_path, 0, lazy_load);
return static_cast<void*>(mlm);
}
void c_mlmodel_destruct_model(void* ptr) {
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr);
delete mlm;
}
// Allowed attr: nr_labels, nr_codes, nr_features
uint32_t c_mlmodel_get_int_attr(void* ptr, const char* attr) {
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr);
return mlm->get_int_attr(attr);
}

#define C_MLMODEL_PREDICT(SUFFIX, PY_MAT, C_MAT) \
void c_mlmodel_predict ## SUFFIX( \
void* ptr, \
const PY_MAT* input_x, \
const ScipyCsrF32* csr_codes, \
const char* overridden_post_processor, \
const uint32_t overridden_only_topk, \
const int num_threads, \
py_sparse_allocator_t pred_alloc) { \
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr); \
C_MAT X(input_x); \
pecos::csr_t prev_layer_pred; \
bool no_prev_pred; \
if (csr_codes) { \
prev_layer_pred = pecos::csr_t(csr_codes).deep_copy(); \
no_prev_pred = false; \
} else { \
prev_layer_pred.fill_ones(X.rows, mlm->code_count()); \
no_prev_pred = true; \
} \
pecos::csr_t cur_layer_pred; \
mlm->predict(X, prev_layer_pred, no_prev_pred, \
overridden_only_topk, overridden_post_processor, \
cur_layer_pred, num_threads); \
cur_layer_pred.create_pycsr(pred_alloc); \
cur_layer_pred.free_underlying_memory(); \
prev_layer_pred.free_underlying_memory(); \
}
C_MLMODEL_PREDICT(_csr_f32, ScipyCsrF32, pecos::csr_t)
C_MLMODEL_PREDICT(_drm_f32, ScipyDrmF32, pecos::drm_t)

#define C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(SUFFIX, PY_MAT, C_MAT) \
void c_mlmodel_predict_on_selected_outputs ## SUFFIX( \
void* ptr, \
const PY_MAT* input_x, \
const ScipyCsrF32* selected_outputs_csr, \
const ScipyCsrF32* csr_codes, \
const char* overridden_post_processor, \
const int num_threads, \
py_sparse_allocator_t pred_alloc) { \
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr); \
C_MAT X(input_x); \
pecos::csr_t curr_outputs_csr = pecos::csr_t(selected_outputs_csr).deep_copy(); \
pecos::csr_t prev_layer_pred; \
bool no_prev_pred; \
if (csr_codes) { \
prev_layer_pred = pecos::csr_t(csr_codes).deep_copy(); \
no_prev_pred = false; \
} else { \
prev_layer_pred.fill_ones(X.rows, mlm->code_count()); \
no_prev_pred = true; \
} \
pecos::csr_t cur_layer_pred; \
mlm->predict_on_selected_outputs(X, curr_outputs_csr, prev_layer_pred, no_prev_pred, \
overridden_post_processor, cur_layer_pred, num_threads); \
cur_layer_pred.create_pycsr(pred_alloc); \
cur_layer_pred.free_underlying_memory(); \
curr_outputs_csr.free_underlying_memory(); \
prev_layer_pred.free_underlying_memory(); \
}
C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(_csr_f32, ScipyCsrF32, pecos::csr_t)
C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(_drm_f32, ScipyDrmF32, pecos::drm_t)

// ==== C Interface of XLinearModels ====
void* c_xlinear_load_model_from_disk(const char* model_path) {
auto model = new pecos::HierarchicalMLModel(model_path);
return static_cast<void*>(model);
Expand All @@ -49,6 +135,7 @@ extern "C" {
// Only implemented for bin_search_chunked
auto model = new pecos::HierarchicalMLModel(model_path, pecos::layer_type_t::LAYER_TYPE_BINARY_SEARCH_CHUNKED);
model->save_mmap(mmap_model_path);
delete model;
}

void c_xlinear_destruct_model(void* ptr) {
Expand Down
Loading

0 comments on commit 5c06b9e

Please sign in to comment.