Skip to content

Commit

Permalink
Handling a case to avoid bringing faiss index to memory unnecessarily (
Browse files Browse the repository at this point in the history
…#279)

- In update/delete heavy scenario where we short circuit the merge path,
we end up reading a faiss index to the memory unnecessarily and it can
potentially stay in memory till the reconstruction of all valid vectors
is complete (which is not ideal).
- Also refactoring the `vecIndexes []*faiss.IndexImpl` construct to
`indexes []*vecIndexInfo` just so we don't have to read info from two
different slices while reconstructing vectors.
  • Loading branch information
Thejas-bhat authored Oct 30, 2024
1 parent 30a47cd commit d29eb55
Showing 1 changed file with 41 additions and 27 deletions.
68 changes: 41 additions & 27 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ func (v *faissVectorIndexSection) AddrForField(opaque map[int]resetable, fieldID
return vo.fieldAddrs[uint16(fieldID)]
}

// metadata corresponding to a serialized vector index
type vecIndexMeta struct {
// information specific to a vector index - (including metadata and
// the index pointer itself)
type vecIndexInfo struct {
startOffset int
indexSize uint64
vecIds []int64
indexOptimizedFor string
index *faiss.IndexImpl
}

// keep in mind with respect to update and delete operations with respect to vectors
Expand All @@ -87,7 +89,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
// in the segment this will help by avoiding multiple allocation
// calls.
vecSegs := make([]*SegmentBase, 0, len(segments))
indexes := make([]*vecIndexMeta, 0, len(segments))
indexes := make([]*vecIndexInfo, 0, len(segments))

for fieldID, fieldName := range fieldsInv {
indexes = indexes[:0] // resizing the slices
Expand Down Expand Up @@ -128,7 +130,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
pos += n

vecSegs = append(vecSegs, sb)
indexes = append(indexes, &vecIndexMeta{
indexes = append(indexes, &vecIndexInfo{
vecIds: make([]int64, 0, numVecs),
indexOptimizedFor: index.VectorIndexOptimizationsReverseLookup[int(indexOptimizationTypeInt)],
})
Expand Down Expand Up @@ -182,7 +184,7 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
}

func (v *vectorIndexOpaque) flushSectionMetadata(fieldID int, w *CountHashWriter,
vecToDocID map[int64]uint64, indexes []*vecIndexMeta) error {
vecToDocID map[int64]uint64, indexes []*vecIndexInfo) error {
tempBuf := v.grabBuf(binary.MaxVarintLen64)

// early exit if there are absolutely no valid vectors present in the segment
Expand Down Expand Up @@ -275,9 +277,14 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {
vecIndexes []*vecIndexInfo, w *CountHashWriter, closeCh chan struct{}) error {

vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs))
// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
var dims, metric int
var indexOptimizedFor string

var validMerge bool
var finalVecIDCap, indexDataCap, reconsCap int
for segI, segBase := range sbs {
// Considering merge operations on vector indexes are expensive, it is
Expand All @@ -287,26 +294,37 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}
if len(vecIndexes[segI].vecIds) == 0 {
// no valid vectors for this index, don't bring it into memory
continue
}

// read the index bytes. todo: parallelize this
indexBytes := segBase.mem[indexes[segI].startOffset : indexes[segI].startOffset+int(indexes[segI].indexSize)]
indexBytes := segBase.mem[vecIndexes[segI].startOffset : vecIndexes[segI].startOffset+int(vecIndexes[segI].indexSize)]
index, err := faiss.ReadIndexFromBuffer(indexBytes, faissIOFlags)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
if len(indexes[segI].vecIds) > 0 {
indexReconsLen := len(indexes[segI].vecIds) * index.D()
if len(vecIndexes[segI].vecIds) > 0 {
indexReconsLen := len(vecIndexes[segI].vecIds) * index.D()
if indexReconsLen > reconsCap {
reconsCap = indexReconsLen
}
indexDataCap += indexReconsLen
finalVecIDCap += len(indexes[segI].vecIds)
finalVecIDCap += len(vecIndexes[segI].vecIds)
}
vecIndexes = append(vecIndexes, index)
vecIndexes[segI].index = index

validMerge = true
// set the dims and metric values from the constructed index.
dims = index.D()
metric = int(index.MetricType())
indexOptimizedFor = vecIndexes[segI].indexOptimizedFor
}

// no vector indexes to merge
if len(vecIndexes) == 0 {
// not a valid merge operation as there are no valid indexes to merge.
if !validMerge {
return nil
}

Expand All @@ -326,18 +344,18 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,

// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
if len(vecIndexes[i].vecIds) > 0 {
neededReconsLen := len(vecIndexes[i].vecIds) * vecIndexes[i].index.D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
recons, err = vecIndexes[i].index.ReconstructBatch(vecIndexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
finalVecIDs = append(finalVecIDs, vecIndexes[i].vecIds...)
}
}

Expand All @@ -351,12 +369,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,

nvecs := len(finalVecIDs)

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
indexOptimizedFor := indexes[0].indexOptimizedFor

// index type to be created after merge based on the number of vectors
// in indexData added into the index.
nlist := determineCentroids(nvecs)
Expand Down Expand Up @@ -419,9 +431,11 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
}

// todo: can be parallelized.
func freeReconstructedIndexes(indexes []*faiss.IndexImpl) {
for _, index := range indexes {
index.Close()
func freeReconstructedIndexes(indexes []*vecIndexInfo) {
for _, entry := range indexes {
if entry.index != nil {
entry.index.Close()
}
}
}

Expand Down

0 comments on commit d29eb55

Please sign in to comment.