diff --git a/tests/system/knowledge_base/test_knowledge_base.py b/tests/system/knowledge_base/test_knowledge_base.py index 3ede466a..0f995ef5 100644 --- a/tests/system/knowledge_base/test_knowledge_base.py +++ b/tests/system/knowledge_base/test_knowledge_base.py @@ -59,7 +59,7 @@ def chunker(): @pytest.fixture(scope="module") def encoder(): return StubRecordEncoder( - StubDenseEncoder(dimension=3)) + StubDenseEncoder()) @pytest.fixture(scope="module", autouse=True) diff --git a/tests/unit/stubs/stub_dense_encoder.py b/tests/unit/stubs/stub_dense_encoder.py index d0e02ff2..9d55bb7f 100644 --- a/tests/unit/stubs/stub_dense_encoder.py +++ b/tests/unit/stubs/stub_dense_encoder.py @@ -1,15 +1,47 @@ -import hashlib +import mmh3 import numpy as np +from collections import defaultdict from typing import Union, List from pinecone_text.dense.base_dense_ecoder import BaseDenseEncoder class StubDenseEncoder(BaseDenseEncoder): - - def __init__(self, dimension: int = 3): + """ + Bag-of-words encoder that uses a random projection matrix to + project sparse vectors to dense vectors. + uses Johnson–Lindenstrauss lemma to project BOW sparse vectors to dense vectors. + """ + + def __init__(self, + dimension: int = 8, + vocab_size: int = 2 ** 12): + self.input_dim = vocab_size self.dimension = dimension + def _text_to_word_counts(self, text: str) -> defaultdict: + words = text.split() + word_counts = defaultdict(int) + for word in words: + hashed_word = mmh3.hash(word) % self.input_dim + word_counts[hashed_word] += 1 + return word_counts + + def _encode_text(self, text: str) -> List[float]: + word_counts = self._text_to_word_counts(text) + + # This will hold the result of word_counts * random_matrix + projected_embedding = np.zeros(self.dimension, dtype=np.float32) + + for hashed_word, count in word_counts.items(): + rng = np.random.default_rng(hashed_word) + # Seed the RNG with the hashed word index for consistency + random_vector = rng.standard_normal(self.dimension) + projected_embedding += count * random_vector + + projected_embedding = projected_embedding.astype(np.float32) + return list(projected_embedding / np.linalg.norm(projected_embedding)) + def encode_documents(self, texts: Union[str, List[str]] ) -> Union[List[float], List[List[float]]]: @@ -20,23 +52,10 @@ def encode_queries(self, ) -> Union[List[float], List[List[float]]]: return self._encode(texts) - def consistent_embedding(self, text: str) -> List[float]: - # consistent embedding function that project each text to a unique angle - embedding = [] - for i in range(self.dimension): - sha256_hash = hashlib.sha256(f"{text} {i}".encode()).hexdigest() - int_value = int(sha256_hash, 16) - embedding.append(int_value / float(1 << 256)) - - l2_norm = np.linalg.norm(embedding) - normalized_embedding = [float(value / l2_norm) for value in embedding] - - return normalized_embedding - def _encode(self, texts: Union[str, List[str]] ) -> Union[List[float], List[List[float]]]: if isinstance(texts, str): - return self.consistent_embedding(texts) + return self._encode_text(texts) else: - return [self.consistent_embedding(text) for text in texts] + return [self._encode_text(text) for text in texts]