From d29eb555a1b0b60349bb37e5ffdb4e40626cdcb3 Mon Sep 17 00:00:00 2001 From: Thejas-bhat <35959007+Thejas-bhat@users.noreply.github.com> Date: Thu, 31 Oct 2024 01:25:58 +0530 Subject: [PATCH] Handling a case to avoid bringing faiss index to memory unnecessarily (#279) - In update/delete heavy scenario where we short circuit the merge path, we end up reading a faiss index to the memory unnecessarily and it can potentially stay in memory till the reconstruction of all valid vectors is complete (which is not ideal). - Also refactoring the `vecIndexes []*faiss.IndexImpl` construct to `indexes []*vecIndexInfo` just so we don't have to read info from two different slices while reconstructing vectors. --- section_faiss_vector_index.go | 68 +++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/section_faiss_vector_index.go b/section_faiss_vector_index.go index 07e15da3..1c9f91a0 100644 --- a/section_faiss_vector_index.go +++ b/section_faiss_vector_index.go @@ -68,12 +68,14 @@ func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID return vo.fieldAddrs[uint16(fieldID)] } -// metadata corresponding to a serialized vector index -type vecIndexMeta struct { +// information specific to a vector index - (including metadata and +// the index pointer itself) +type vecIndexInfo struct { startOffset int indexSize uint64 vecIds []int64 indexOptimizedFor string + index *faiss.IndexImpl } // keep in mind with respect to update and delete operations with respect to vectors @@ -87,7 +89,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se // in the segment this will help by avoiding multiple allocation // calls. vecSegs := make([]*SegmentBase, 0, len(segments)) - indexes := make([]*vecIndexMeta, 0, len(segments)) + indexes := make([]*vecIndexInfo, 0, len(segments)) for fieldID, fieldName := range fieldsInv { indexes = indexes[:0] // resizing the slices @@ -128,7 +130,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se pos += n vecSegs = append(vecSegs, sb) - indexes = append(indexes, &vecIndexMeta{ + indexes = append(indexes, &vecIndexInfo{ vecIds: make([]int64, 0, numVecs), indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)], }) @@ -182,7 +184,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se } func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter, - vecToDocID map[int64]uint64, indexes []*vecIndexMeta) error { + vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error { tempBuf := v.grabBuf(binary.MaxVarintLen64) // early exit if there are absolutely no valid vectors present in the segment @@ -275,9 +277,14 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 { // todo: naive implementation. need to keep in mind the perf implications and improve on this. // perhaps, parallelized merging can help speed things up over here. func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, - indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error { + vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error { - vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs)) + // safe to assume that all the indexes are of the same config values, given + // that they are extracted from the field mapping info. + var dims, metric int + var indexOptimizedFor string + + var validMerge bool var finalVecIDCap, indexDataCap, reconsCap int for segI, segBase := range sbs { // Considering merge operations on vector indexes are expensive, it is @@ -287,26 +294,37 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, freeReconstructedIndexes(vecIndexes) return seg.ErrClosed } + if len(vecIndexes[segI].vecIds) == 0 { + // no valid vectors for this index, don't bring it into memory + continue + } + // read the index bytes. todo: parallelize this - indexBytes := segBase.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)] + indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)] index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags) if err != nil { freeReconstructedIndexes(vecIndexes) return err } - if len(indexes[segI].vecIds) > 0 { - indexReconsLen := len(indexes[segI].vecIds) * index.D() + if len(vecIndexes[segI].vecIds) > 0 { + indexReconsLen := len(vecIndexes[segI].vecIds) * index.D() if indexReconsLen > reconsCap { reconsCap = indexReconsLen } indexDataCap += indexReconsLen - finalVecIDCap += len(indexes[segI].vecIds) + finalVecIDCap += len(vecIndexes[segI].vecIds) } - vecIndexes = append(vecIndexes, index) + vecIndexes[segI].index = index + + validMerge = true + // set the dims and metric values from the constructed index. + dims = index.D() + metric = int(index.MetricType()) + indexOptimizedFor = vecIndexes[segI].indexOptimizedFor } - // no vector indexes to merge - if len(vecIndexes) == 0 { + // not a valid merge operation as there are no valid indexes to merge. + if !validMerge { return nil } @@ -326,18 +344,18 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, // reconstruct the vectors only if present, it could be that // some of the indexes had all of their vectors updated/deleted. - if len(indexes[i].vecIds) > 0 { - neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D() + if len(vecIndexes[i].vecIds) > 0 { + neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D() recons = recons[:neededReconsLen] // todo: parallelize reconstruction - recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons) + recons, err = vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, recons) if err != nil { freeReconstructedIndexes(vecIndexes) return err } indexData = append(indexData, recons...) // Adding vector IDs in the same order as the vectors - finalVecIDs = append(finalVecIDs, indexes[i].vecIds...) + finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...) } } @@ -351,12 +369,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, nvecs := len(finalVecIDs) - // safe to assume that all the indexes are of the same config values, given - // that they are extracted from the field mapping info. - dims := vecIndexes[0].D() - metric := vecIndexes[0].MetricType() - indexOptimizedFor := indexes[0].indexOptimizedFor - // index type to be created after merge based on the number of vectors // in indexData added into the index. nlist := determineCentroids(nvecs) @@ -419,9 +431,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase, } // todo: can be parallelized. -func freeReconstructedIndexes(indexes []*faiss.IndexImpl) { - for _, index := range indexes { - index.Close() +func freeReconstructedIndexes(indexes []*vecIndexInfo) { + for _, entry := range indexes { + if entry.index != nil { + entry.index.Close() + } } }