Skip to content

Commit

Permalink
Toy: Limited Training Size
Browse files Browse the repository at this point in the history
  • Loading branch information
Likith101 committed Oct 17, 2024
1 parent 1c5b688 commit 69c4c11
Showing 1 changed file with 131 additions and 62 deletions.
193 changes: 131 additions & 62 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,10 @@ func calculateNprobe(nlist int, indexOptimizedFor string) int32 {
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
indexes []*vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {

vecIndexes := make([]*faiss.IndexImpl, 0, len(sbs))
var finalVecIDCap, indexDataCap, reconsCap int
nvecs := 0
dims := 0
for segI, segBase := range sbs {
// Considering merge operations on vector indexes are expensive, it is
// worth including an early exit if the merge is aborted, saving us
Expand All @@ -296,6 +297,8 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
}
if len(indexes[segI].vecIds) > 0 {
indexReconsLen := len(indexes[segI].vecIds) * index.D()
dims = index.D()
nvecs += len(indexes[segI].vecIds)
if indexReconsLen > reconsCap {
reconsCap = indexReconsLen
}
Expand All @@ -309,51 +312,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
if len(vecIndexes) == 0 {
return nil
}

finalVecIDs := make([]int64, 0, finalVecIDCap)
// merging of indexes with reconstruction method.
// the indexes[i].vecIds has only the valid vecs of this vector
// index present in it, so we'd be reconstructing only those.
indexData := make([]float32, 0, indexDataCap)
// reusable buffer for reconstruction
recons := make([]float32, 0, reconsCap)
var err error
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}

// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
}
}

if len(indexData) == 0 {
// no valid vectors for this index, so we don't even have to
// record it in the section
freeReconstructedIndexes(vecIndexes)
return nil
}
recons = nil

nvecs := len(finalVecIDs)

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
indexOptimizedFor := indexes[0].indexOptimizedFor

Expand All @@ -366,8 +324,6 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
// to do the same is not needed because the following operations don't need
// the reconstructed ones anymore and doing so will hold up memory which can
// be detrimental while creating indexes during introduction.
freeReconstructedIndexes(vecIndexes)
vecIndexes = nil

faissIndex, err := faiss.IndexFactory(dims, indexDescription, metric)
if err != nil {
Expand All @@ -376,34 +332,147 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(sbs []*SegmentBase,
defer faissIndex.Close()

if indexClass == IndexTypeIVF {
// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = faissIndex.SetDirectMap(2)
if err != nil {
return err
}

nprobe := calculateNprobe(nlist, indexOptimizedFor)
faissIndex.SetNProbe(nprobe)
}

if nvecs < 100000 {
finalVecIDs := make([]int64, 0, finalVecIDCap)
// merging of indexes with reconstruction method.
// the indexes[i].vecIds has only the valid vecs of this vector
// index present in it, so we'd be reconstructing only those.
indexData := make([]float32, 0, indexDataCap)
// reusable buffer for reconstruction
recons := make([]float32, 0, reconsCap)
var err error
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}

// reconstruct the vectors only if present, it could be that
// some of the indexes had all of their vectors updated/deleted.
if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = append(indexData, recons...)
// Adding vector IDs in the same order as the vectors
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
}
}

if len(indexData) == 0 {
// no valid vectors for this index, so we don't even have to
// record it in the section
freeReconstructedIndexes(vecIndexes)
return nil
}

recons = nil
freeReconstructedIndexes(vecIndexes)
vecIndexes = nil

// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = faissIndex.Train(indexData)
if indexClass == IndexTypeIVF {
err = faissIndex.Train(indexData)
if err != nil {
return err
}
}
err = faissIndex.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}
}

err = faissIndex.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
indexData = nil
finalVecIDs = nil

} else {
recons := make([]float32, 0, reconsCap)
curVecs := 0
vecLimit := 100000
if vecLimit < nlist*40 {
vecLimit = nlist * 40
}
finalVecIDs := make([]int64, 0, vecLimit)
indexData := make([]float32, 0, vecLimit*dims)
trained := false

var err error

for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
freeReconstructedIndexes(vecIndexes)
return seg.ErrClosed
}

if len(indexes[i].vecIds) > 0 {
neededReconsLen := len(indexes[i].vecIds) * vecIndexes[i].D()
recons = recons[:neededReconsLen]
// todo: parallelize reconstruction
recons, err = vecIndexes[i].ReconstructBatch(indexes[i].vecIds, recons)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
if curVecs+len(indexes[i].vecIds) > vecLimit {
indexData = append(indexData, recons[:(vecLimit-curVecs)*vecIndexes[i].D()]...)
finalVecIDs = append(finalVecIDs, indexes[i].vecIds[:(vecLimit-curVecs)]...)
if !trained {
err = faissIndex.Train(indexData)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
trained = true
}
err = faissIndex.AddWithIDs(indexData, finalVecIDs)
if err != nil {
freeReconstructedIndexes(vecIndexes)
return err
}
indexData = indexData[:0]
finalVecIDs = finalVecIDs[:0]
indexData = append(indexData, recons[(vecLimit-curVecs)*vecIndexes[i].D():]...)
finalVecIDs = append(finalVecIDs, indexes[i].vecIds[(vecLimit-curVecs):]...)
curVecs = len(finalVecIDs)
} else {
indexData = append(indexData, recons...)
finalVecIDs = append(finalVecIDs, indexes[i].vecIds...)
curVecs = len(finalVecIDs)
}
}
}
recons = nil
freeReconstructedIndexes(vecIndexes)
vecIndexes = nil
if curVecs > 0 {
if !trained {
err = faissIndex.Train(indexData)
if err != nil {
return err
}
}
err = faissIndex.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}
}
indexData = nil
finalVecIDs = nil
}

indexData = nil
finalVecIDs = nil
var mergedIndexBytes []byte
mergedIndexBytes, err = faiss.WriteIndexIntoBuffer(faissIndex)
if err != nil {
Expand Down

0 comments on commit 69c4c11

Please sign in to comment.