Skip to content

Commit

Permalink
Merge pull request #2 from bioinformatics-ua/collection-refactor
Browse files Browse the repository at this point in the history
Add an AbstractSparseCollection and leave SparseColletion as default imp
  • Loading branch information
T-Almeida authored Oct 24, 2023
2 parents 573fc63 + 4e47275 commit 9d7ca68
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 74 deletions.
2 changes: 1 addition & 1 deletion converting_from_anserini.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def main(dataset_folder):


sparseCSR_collection = SparseCollectionCSR.from_bm25_pyserini_iterator(os.path.join(dataset_folder, "anserini_index"),
sparseCSR_collection = SparseCollectionCSR.from_bm25_pyserini_index(os.path.join(dataset_folder, "anserini_index"),
k1=1.2,
b=0.75,
dtype=spare.float32,
Expand Down
160 changes: 90 additions & 70 deletions spare/collection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from spare import TYPE
from spare.utils import get_coo_sparce_GB, get_csr_sparce_GB
from spare.utils import get_coo_sparce_GB, get_csr_sparce_GB, load_backend, maybe_init
from tqdm import tqdm
from spare.metadata import MetaDataDFandDL
import os
import jsonpickle
import shutil
from spare.weighting_model import WeightingSchemaType, CountingWeightingSchema, BM25WeightingSchema

class SparseCollection:
### implements a COO sparse matrix

class AbstractSparseCollection:
"""
A SparseCollection without a concrete implementation
"""
def __init__(self,
collection_maxsize,
text_to_vec=None,
Expand All @@ -19,7 +22,6 @@ def __init__(self,
backend="torch") -> None:
super().__init__()


self.collection_maxsize = collection_maxsize

if isinstance(dtype, int):
Expand All @@ -35,14 +37,10 @@ def __init__(self,
else:
self.vec_dim = vec_dim

if backend=="torch":
from spare.backend_torch import TorchBackend
self.backend = TorchBackend()
else:
RuntimeError("Only torch backend is currently supported")
self.backend = load_backend(backend)

self.metadata = metadata()
self.weighting_schema = weighting_schema()
self.metadata = maybe_init(metadata)
self.weighting_schema = maybe_init(weighting_schema)

@classmethod
def from_text_iterator(cls,
Expand Down Expand Up @@ -98,18 +96,14 @@ def from_bm25_pyserini_index(cls,
sparse_collection = cls(index_reader.stats()["documents"],
vec_dim=index_reader.stats()["unique_terms"],
dtype=dtype,
weighting_schema=BM25WeightingSchema,
weighting_schema=BM25WeightingSchema(k1=k1, b=b, idf_weighting=None),
backend=backend,
**kwargs)

# probably can skip estimation
sparse_collection._build_sparse_collection(bm25_pyserini_iterator(index_reader, k1=k1, b=b),
max_files_for_estimation=max_files_for_estimation)

sparse_collection.weighting_schema.k1 = k1
sparse_collection.weighting_schema.b = b
sparse_collection.weighting_schema.idf_weighting = None


return sparse_collection

def transform(self, transform_operator):
Expand All @@ -119,10 +113,10 @@ def transform(self, transform_operator):
raise ValueError("Your current collection weighing schema or metadata is not compatible with the transformation asked.")

def get_sparce_matrix(self):
return self.backend.create_coo_matrix(*self.sparse_vecs, self.shape)
raise NotImplementedError("method get_sparce_matrix was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")

def _correct_transform_operator(self, transform_operator):
return transform_operator.for_coo()
raise NotImplementedError("method _correct_transform_operator was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")

def _build_sparse_collection(self, iterator, max_files_for_estimation):

Expand Down Expand Up @@ -194,57 +188,17 @@ def _build_sparse_tensor(self, iterator, collection_maxsize, elements_expected,
return self._slice_sparse_vecs(*sparse_vecs, element_index)

def _slice_sparse_vecs(self, indices, values, element_index):
# if I slice the indices I will create a non-continguous vector... which is not good
#indices = self.backend.slice_tensor(indices, (slice(None, None, None), slice(None, element_index, None)))
#values = self.backend.slice_tensor(values, slice(None, element_index, None))

# for the COO we will pad the documents to +1
doc_id = self.backend.get_value_from_tensor(indices, (0,element_index-1))+1
len_pad = values.shape[0]-element_index
self.backend.assign_data_to_tensor(indices,(0, slice(element_index, None, None)),[doc_id]*len_pad,TYPE.int64)

# update shape to include the padded doc
self.shape = (self.shape[0]+1, self.shape[1])
self.metadata.update(doc_id, [], [0]) # pad the metadata

return indices, values

raise NotImplementedError("method _slice_sparse_vecs was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")

def _init_sparse_vecs(self, elements_expected):
indices = self.backend.create_zero_tensor((2,elements_expected), TYPE.int64)
values = self.backend.create_zero_tensor((elements_expected,), self.dtype)

return indices, values

def _sparcify_bow_and_meta_update(self, bow, index_docs):
py_indices_row = []
py_indices_col = []
py_values = []
for token_index in sorted(bow.keys()):
py_indices_row.append(index_docs)
py_indices_col.append(token_index)
py_values.append(bow[token_index])

self.metadata.update(index_docs, py_indices_col, py_values)

return py_indices_row, py_indices_col, py_values
raise NotImplementedError("method _init_sparse_vecs was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")

def _update_sparse_vecs(self, indices, values, bow, element_index, index_docs):

py_indices_row, py_indices_col, py_values = self._sparcify_bow_and_meta_update(bow, index_docs)

self.backend.assign_data_to_tensor(indices,
(slice(None, None, None), slice(element_index,element_index+len(py_indices_col), None)),
[py_indices_row, py_indices_col],
TYPE.int64)
self.backend.assign_data_to_tensor(values,
slice(element_index,element_index+len(py_indices_col), None),
py_values,
self.dtype)

return len(py_indices_col)
raise NotImplementedError("method _update_sparse_vecs was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")


def get_sparce_matrix_space(self):
return get_coo_sparce_GB(self.shape, self.density, self.dtype)
raise NotImplementedError("method get_sparce_matrix_space was not implemented, if this is an AbstractSparseCollection. Then the behaviour is expected.")

def _get_matrix_estimations(self,
sampled_bow_list):
Expand Down Expand Up @@ -299,8 +253,70 @@ def load_from_file(cls, folder_name, backend="torch"):
sparse_collection.sparse_vecs = sparse_collection.backend.load_tensors_from_file(os.path.join(folder_name, "tensors.safetensors"))

return sparse_collection

class SparseCollectionCOO(AbstractSparseCollection):

def get_sparce_matrix(self):
return self.backend.create_coo_matrix(*self.sparse_vecs, self.shape)

def _correct_transform_operator(self, transform_operator):
return transform_operator.for_coo()

def get_sparce_matrix_space(self):
return get_coo_sparce_GB(self.shape, self.density, self.dtype)

def _init_sparse_vecs(self, elements_expected):

indices = self.backend.create_zero_tensor((2,elements_expected), TYPE.int64)
values = self.backend.create_zero_tensor((elements_expected,), self.dtype)

return indices, values

def _sparcify_bow_and_meta_update(self, bow, index_docs):
py_indices_row = []
py_indices_col = []
py_values = []
for token_index in sorted(bow.keys()):
py_indices_row.append(index_docs)
py_indices_col.append(token_index)
py_values.append(bow[token_index])

self.metadata.update(index_docs, py_indices_col, py_values)

return py_indices_row, py_indices_col, py_values

def _update_sparse_vecs(self, indices, values, bow, element_index, index_docs):

py_indices_row, py_indices_col, py_values = self._sparcify_bow_and_meta_update(bow, index_docs)

self.backend.assign_data_to_tensor(indices,
(slice(None, None, None), slice(element_index,element_index+len(py_indices_col), None)),
[py_indices_row, py_indices_col],
TYPE.int64)
self.backend.assign_data_to_tensor(values,
slice(element_index,element_index+len(py_indices_col), None),
py_values,
self.dtype)

return len(py_indices_col)

def _slice_sparse_vecs(self, indices, values, element_index):
# if I slice the indices I will create a non-continguous vector... which is not good
#indices = self.backend.slice_tensor(indices, (slice(None, None, None), slice(None, element_index, None)))
#values = self.backend.slice_tensor(values, slice(None, element_index, None))

class SparseCollectionCSR(SparseCollection):
# for the COO we will pad the documents to +1
doc_id = self.backend.get_value_from_tensor(indices, (0,element_index-1))+1
len_pad = values.shape[0]-element_index
self.backend.assign_data_to_tensor(indices,(0, slice(element_index, None, None)),[doc_id]*len_pad,TYPE.int64)

# update shape to include the padded doc
self.shape = (self.shape[0]+1, self.shape[1])
self.metadata.update(doc_id, [], [0]) # pad the metadata

return indices, values

class SparseCollectionCSR(SparseCollectionCOO):
# sparse collection imlemented with csr
def __init__(self,
*args,
Expand All @@ -310,13 +326,13 @@ def __init__(self,
if isinstance(indices_dtype, int):
indices_dtype = TYPE(indices_dtype)
self.indices_dtype = indices_dtype

def _correct_transform_operator(self, transform_operator):
return transform_operator.for_csr()

def get_sparce_matrix_space(self):
return get_csr_sparce_GB(self.shape, self.density, self.dtype, self.indices_dtype)

def _correct_transform_operator(self, transform_operator):
return transform_operator.for_csr()

def get_sparce_matrix(self):
return self.backend.create_csr_matrix(*self.sparse_vecs, self.shape, self.dtype)

Expand Down Expand Up @@ -360,4 +376,8 @@ def _slice_sparse_vecs(self, crow_indices, col_indices, values, element_index):
return crow_indices, col_indices, values

def _get_class_attributes(self):
return super()._get_class_attributes() | {"indices_dtype":self.indices_dtype.value}
return super()._get_class_attributes() | {"indices_dtype":self.indices_dtype.value}


# current default implementation
SparseCollection = SparseCollectionCSR
3 changes: 1 addition & 2 deletions spare/retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ def query_transform(query):
return_scores=return_scores)

# maybe convert index2docID if docID!=index

s_time = time.time()
converted_indices = []
for i in range(len(out.ids)):
q_indices = self.collection.metadata.index2docID[out.ids[i]]#[self.collection.metadata.index2docID[idx] for idx in out.ids[i]]
q_indices = self.collection.metadata.index2docID[out.ids[i]]
converted_indices.append(q_indices)
print("Time to convert docs ids",time.time()-s_time )
out.ids = np.array(converted_indices)
Expand Down
15 changes: 15 additions & 0 deletions spare/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@
import math
import numpy as np



def maybe_init(class_or_insatnce):
if isinstance(class_or_insatnce, type):
return class_or_insatnce()
else:
return class_or_insatnce

def load_backend(backend):
if backend=="torch":
from spare.backend_torch import TorchBackend
return TorchBackend()
else:
RuntimeError("Only torch backend is currently supported")

def get_best_np_dtype_for(min_value, max_value):
uint32_bounds = np.iinfo("uint32")
uint64_bounds = np.iinfo("uint64")
Expand Down
2 changes: 1 addition & 1 deletion spare/weighting_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _get_vars_to_save(self):
return super()._get_vars_to_save() | {
"k1": self.k1,
"b": self.b,
"idf_weighting": idf_weighting
"idf_weighting": self.idf_weighting
}

def _load_vars(self, data):
Expand Down

0 comments on commit 9d7ca68

Please sign in to comment.