Skip to content

Commit

Permalink
Add ingest gRPC methods to create and delete vectorsets (#2202)
Browse files Browse the repository at this point in the history
* Add new ingest gRPC methods to create and delete vectorsets

* Ingest gRPC uses ORM KnowledgeBox methods

* Add methods in cluster manager

* Partial implementation on KnowledgeBox

* Mark vectorset to delete

* Implement indexes create/delete vectorset
  • Loading branch information
jotare authored May 30, 2024
1 parent 4ee2324 commit 5122373
Show file tree
Hide file tree
Showing 12 changed files with 567 additions and 134 deletions.
33 changes: 33 additions & 0 deletions nucliadb/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from nucliadb.common.maindb.driver import Transaction
from nucliadb_protos import (
knowledgebox_pb2,
nodereader_pb2,
noderesources_pb2,
nodewriter_pb2,
Expand Down Expand Up @@ -411,6 +412,38 @@ async def maybe_create_new_shard(
await self.create_shard_by_kbid(txn, kbid)
await txn.commit()

async def create_vectorset(
self, kbid: str, config: knowledgebox_pb2.VectorSetConfig
):
"""Create a new vectorset in all KB shards."""

async def _create_vectorset(node: AbstractIndexNode, shard_id: str):
vectorset_id = config.vectorset_id
index_config = config.vectorset_index_config
result = await node.add_vectorset(shard_id, vectorset_id, index_config)
if result.status != result.Status.OK:
raise NodeError(
f"Unable to create vectorset {vectorset_id} in kb {kbid} shard {shard_id}"
)

await self.apply_for_all_shards(
kbid, _create_vectorset, timeout=10, use_read_replica_nodes=False
)

async def delete_vectorset(self, kbid: str, vectorset_id: str):
"""Delete a vectorset from all KB shards"""

async def _delete_vectorset(node: AbstractIndexNode, shard_id: str):
result = await node.remove_vectorset(shard_id, vectorset_id)
if result.status != result.Status.OK:
raise NodeError(
f"Unable to delete vectorset {vectorset_id} in kb {kbid} shard {shard_id}"
)

await self.apply_for_all_shards(
kbid, _delete_vectorset, timeout=10, use_read_replica_nodes=False
)


class StandaloneKBShardManager(KBShardManager):
max_ops_before_checks = 200
Expand Down
4 changes: 4 additions & 0 deletions nucliadb/nucliadb/ingest/orm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ class ResourceNotIndexable(Exception):

class EntityManagementException(Exception):
pass


class VectorSetConflict(Exception):
pass
25 changes: 23 additions & 2 deletions nucliadb/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.ingest import SERVICE_NAME, logger
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict, VectorSetConflict
from nucliadb.ingest.orm.resource import (
KB_RESOURCE_SLUG,
KB_RESOURCE_SLUG_BASE,
Resource,
)
from nucliadb.ingest.orm.utils import choose_matryoshka_dimension, compute_paragraph_key
from nucliadb.migrator.utils import get_latest_version
from nucliadb_protos import writer_pb2
from nucliadb_protos import knowledgebox_pb2, writer_pb2
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import get_audit, get_storage

Expand All @@ -61,6 +61,9 @@
KB_TO_DELETE = f"{KB_TO_DELETE_BASE}{{kbid}}"
KB_TO_DELETE_STORAGE = f"{KB_TO_DELETE_STORAGE_BASE}{{kbid}}"

KB_VECTORSET_TO_DELETE_BASE = "/kbvectorsettodelete"
KB_VECTORSET_TO_DELETE = f"{KB_VECTORSET_TO_DELETE_BASE}/{{kbid}}"


class KnowledgeBox:
def __init__(self, txn: Transaction, storage: Storage, kbid: str):
Expand Down Expand Up @@ -418,6 +421,24 @@ async def iterate_resources(self) -> AsyncGenerator[Resource, None]:
disable_vectors=False,
)

async def create_vectorset(self, config: knowledgebox_pb2.VectorSetConfig):
if await datamanagers.vectorsets.exists(
self.txn, kbid=self.kbid, vectorset_id=config.vectorset_id
):
raise VectorSetConflict(f"Vectorset {config.vectorset_id} already exists")
await datamanagers.vectorsets.set(self.txn, kbid=self.kbid, config=config)
shard_manager = get_shard_manager()
await shard_manager.create_vectorset(self.kbid, config)

async def delete_vectorset(self, vectorset_id: str):
await datamanagers.vectorsets.delete(
self.txn, kbid=self.kbid, vectorset_id=vectorset_id
)
# mark vectorset for async deletion
await self.txn.set(KB_VECTORSET_TO_DELETE.format(kbid=self.kbid), b"")
shard_manager = get_shard_manager()
await shard_manager.delete_vectorset(self.kbid, vectorset_id)


def chunker(seq: Sequence, size: int):
return (seq[pos : pos + size] for pos in range(0, len(seq), size))
Expand Down
62 changes: 60 additions & 2 deletions nucliadb/nucliadb/ingest/service/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
NewKnowledgeBoxResponse,
SemanticModelMetadata,
UpdateKnowledgeBoxResponse,
VectorSetConfig,
)
from nucliadb_protos.resources_pb2 import CloudFile
from nucliadb_protos.writer_pb2 import (
BinaryData,
BrokerMessage,
DelEntitiesRequest,
DelLabelsRequest,
DelVectorSetRequest,
DelVectorSetResponse,
ExtractedVectorsWrapper,
FileRequest,
FileUploaded,
Expand All @@ -59,6 +62,8 @@
ListMembersResponse,
NewEntitiesGroupRequest,
NewEntitiesGroupResponse,
NewVectorSetRequest,
NewVectorSetResponse,
OpStatusWriter,
SetEntitiesRequest,
SetLabelsRequest,
Expand All @@ -80,12 +85,12 @@
from nucliadb.common.maindb.utils import setup_driver
from nucliadb.ingest import SERVICE_NAME, logger
from nucliadb.ingest.orm.entities import EntitiesManager
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict
from nucliadb.ingest.orm.exceptions import KnowledgeBoxConflict, VectorSetConflict
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox as KnowledgeBoxORM
from nucliadb.ingest.orm.processor import Processor, sequence_manager
from nucliadb.ingest.orm.resource import Resource as ResourceORM
from nucliadb.ingest.settings import settings
from nucliadb_protos import utils_pb2, writer_pb2, writer_pb2_grpc
from nucliadb_protos import nodewriter_pb2, utils_pb2, writer_pb2, writer_pb2_grpc
from nucliadb_telemetry import errors
from nucliadb_utils import const
from nucliadb_utils.settings import is_onprem_nucliadb, running_settings
Expand Down Expand Up @@ -687,6 +692,59 @@ async def generate_buffer(
result = FileUploaded()
return result

async def NewVectorSet(
self, request: NewVectorSetRequest, context=None
) -> NewVectorSetResponse:
config = VectorSetConfig(
vectorset_id=request.vectorset_id,
vectorset_index_config=nodewriter_pb2.VectorIndexConfig(
similarity=request.similarity,
normalize_vectors=request.normalize_vectors,
vector_type=request.vector_type,
vector_dimension=request.vector_dimension,
),
matryoshka_dimensions=request.matryoshka_dimensions,
)
response = NewVectorSetResponse()
try:
async with self.driver.transaction() as txn:
kbobj = KnowledgeBoxORM(txn, self.storage, request.kbid)
await kbobj.create_vectorset(config)
await txn.commit()
except VectorSetConflict as exc:
response.status = NewVectorSetResponse.Status.ERROR
response.details = str(exc)
except Exception as exc:
errors.capture_exception(exc)
logger.error(
"Error in ingest gRPC while creating a vectorset", exc_info=True
)
response.status = NewVectorSetResponse.Status.ERROR
response.details = str(exc)
else:
response.status = NewVectorSetResponse.Status.OK
return response

async def DelVectorSet(
self, request: DelVectorSetRequest, context=None
) -> DelVectorSetResponse:
response = DelVectorSetResponse()
try:
async with self.driver.transaction() as txn:
kbobj = KnowledgeBoxORM(txn, self.storage, request.kbid)
await kbobj.delete_vectorset(request.vectorset_id)
await txn.commit()
except Exception as exc:
errors.capture_exception(exc)
logger.error(
"Error in ingest gRPC while deleting a vectorset", exc_info=True
)
response.status = DelVectorSetResponse.Status.ERROR
response.details = str(exc)
else:
response.status = DelVectorSetResponse.Status.OK
return response


LEARNING_SIMILARITY_FUNCTION_TO_PROTO = {
"cosine": utils_pb2.VectorSimilarity.COSINE,
Expand Down
4 changes: 3 additions & 1 deletion nucliadb_protos/python/nucliadb_protos/train_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ from nucliadb_protos.writer_pb2 import (
DelEntitiesRequest as DelEntitiesRequest,
DelLabelsRequest as DelLabelsRequest,
DelVectorSetRequest as DelVectorSetRequest,
DelVectorSetResponse as DelVectorSetResponse,
Error as Error,
FileRequest as FileRequest,
FileUploaded as FileUploaded,
Expand All @@ -150,13 +151,14 @@ from nucliadb_protos.writer_pb2 import (
MergeEntitiesRequest as MergeEntitiesRequest,
NewEntitiesGroupRequest as NewEntitiesGroupRequest,
NewEntitiesGroupResponse as NewEntitiesGroupResponse,
NewVectorSetRequest as NewVectorSetRequest,
NewVectorSetResponse as NewVectorSetResponse,
Notification as Notification,
NotificationSource as NotificationSource,
OpStatusWriter as OpStatusWriter,
PROCESSOR as PROCESSOR,
SetEntitiesRequest as SetEntitiesRequest,
SetLabelsRequest as SetLabelsRequest,
SetVectorSetRequest as SetVectorSetRequest,
SetVectorsRequest as SetVectorsRequest,
SetVectorsResponse as SetVectorsResponse,
ShardObject as ShardObject,
Expand Down
4 changes: 3 additions & 1 deletion nucliadb_protos/python/nucliadb_protos/train_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ from nucliadb_protos.writer_pb2 import (
DelEntitiesRequest as DelEntitiesRequest,
DelLabelsRequest as DelLabelsRequest,
DelVectorSetRequest as DelVectorSetRequest,
DelVectorSetResponse as DelVectorSetResponse,
Error as Error,
FileRequest as FileRequest,
FileUploaded as FileUploaded,
Expand All @@ -148,13 +149,14 @@ from nucliadb_protos.writer_pb2 import (
MergeEntitiesRequest as MergeEntitiesRequest,
NewEntitiesGroupRequest as NewEntitiesGroupRequest,
NewEntitiesGroupResponse as NewEntitiesGroupResponse,
NewVectorSetRequest as NewVectorSetRequest,
NewVectorSetResponse as NewVectorSetResponse,
Notification as Notification,
NotificationSource as NotificationSource,
OpStatusWriter as OpStatusWriter,
PROCESSOR as PROCESSOR,
SetEntitiesRequest as SetEntitiesRequest,
SetLabelsRequest as SetLabelsRequest,
SetVectorSetRequest as SetVectorSetRequest,
SetVectorsRequest as SetVectorsRequest,
SetVectorsResponse as SetVectorsResponse,
ShardObject as ShardObject,
Expand Down
118 changes: 63 additions & 55 deletions nucliadb_protos/python/nucliadb_protos/writer_pb2.py

Large diffs are not rendered by default.

Loading

3 comments on commit 5122373

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5122373 Previous: 08db1e8 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 2897.408768633879 iter/sec (stddev: 0.0000022914301070040453) 3041.132211072051 iter/sec (stddev: 0.0000011628635974660105) 1.05

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5122373 Previous: 08db1e8 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 2992.616037518435 iter/sec (stddev: 0.00000672192621351497) 3041.132211072051 iter/sec (stddev: 0.0000011628635974660105) 1.02

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 5122373 Previous: 08db1e8 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 3018.3387127927126 iter/sec (stddev: 0.000003682293299007059) 3041.132211072051 iter/sec (stddev: 0.0000011628635974660105) 1.01

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.