Skip to content

Commit

Permalink
replacing the flat index with the IVF family index. (#180)
Browse files Browse the repository at this point in the history
* using IVF instead of flat for smaller indexes

* unit test fix

* cleanup; bug fix: track erro from add_with_ids
  • Loading branch information
Thejas-bhat authored Nov 20, 2023
1 parent f5e149a commit d292ef9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 75 deletions.
1 change: 0 additions & 1 deletion faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ func (vpl *VecPostingsIterator) BytesWritten() uint64 {
}

func (sb *SegmentBase) SimilarVectors(field string, qVector []float32, k int64, except *roaring.Bitmap) (segment.VecPostingsList, error) {

// 1. returned postings list (of type PostingsList) has two types of information - docNum and its score.
// 2. both the values can be represented using roaring bitmaps.
// 3. the Iterator (of type PostingsIterator) returned would operate in terms of VecPostings.
Expand Down
3 changes: 2 additions & 1 deletion faiss_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ func serializeVecs(dataset [][]float32) []float32 {
func letsCreateVectorIndexForTesting(dataset [][]float32, dims int, similarity string) (*faiss.IndexImpl, error) {
vecs := serializeVecs(dataset)

idx, err := faiss.IndexFactory(dims, "Flat,IDMap2", faiss.MetricL2)
idx, err := faiss.IndexFactory(dims, "IVF1,Flat", faiss.MetricL2)
if err != nil {
return nil, err
}
idx.SetDirectMap(2)

idx.Train(vecs)

Expand Down
136 changes: 63 additions & 73 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,85 +271,57 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme

defer freeReconstructedIndexes(vecIndexes)

// todo: perhaps making this threshold a config value would be better?
if len(vecToDocID) > 10000 {
// merging of more complex index types (for eg ivf family) with reconstruction
// method.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
}
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
}

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
finalVecIDs := maps.Keys(vecToDocID)

// todo: perhaps the index type can be chosen from a config and tuned accordingly.
index, err := faiss.IndexFactory(dims, "IVF100,SQ8", metric)
if err != nil {
return err
}

defer index.Close()

// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = index.SetDirectMap(2)
if err != nil {
return err
// merging of more complex index types (for eg ivf family) with reconstruction
// method.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
}

// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = index.Train(indexData)
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
}

index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}
// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
finalVecIDs := maps.Keys(vecToDocID)

mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(index)
if err != nil {
return err
}
indexType := getIndexType(len(vecToDocID))
index, err := faiss.IndexFactory(dims, indexType, metric)
if err != nil {
return err
}
defer index.Close()

fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w)
if err != nil {
return err
}
v.fieldAddrs[uint16(fieldID)] = fieldStart
// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = index.SetDirectMap(2)
if err != nil {
return err
}

return nil
// train the vector index, essentially performs k-means clustering to partition
// the data space of indexData such that during the search time, we probe
// only a subset of vectors -> non-exhaustive search. could be a time
// consuming step when the indexData is large.
err = index.Train(indexData)
if err != nil {
return err
}

// todo: ivf -> flat index when there were huge number of vector deletes for this field
for i := 1; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
}
err := vecIndexes[0].MergeFrom(vecIndexes[i], 0)
if err != nil {
return err
}
err = index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}

mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(vecIndexes[0])
mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(index)
if err != nil {
return err
}
Expand All @@ -359,6 +331,7 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme
return err
}
v.fieldAddrs[uint16(fieldID)] = fieldStart

return nil
}

Expand All @@ -379,6 +352,17 @@ func (v *vectorIndexOpaque) grabBuf(size int) []byte {
return buf[0:size]
}

func getIndexType(nVecs int) string {
switch {
case nVecs >= 10000:
return "IVF100,SQ8"
case nVecs >= 10:
return "IVF10,Flat"
default:
return "IVF1,Flat"
}
}

func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint64, err error) {
// for every fieldID, contents to store over here are:
// 1. the serialized representation of the dense vector index.
Expand All @@ -398,22 +382,28 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
metric = faiss.MetricInnerProduct
}

// create an index, its always a flat for now, because each batch size
// won't have too many vectors (in order for >100K). todo: will need to revisit
// this logic - creating based on configured batch size in scorch.
index, err := faiss.IndexFactory(int(content.dim), "IDMap2,Flat", metric)
// create an index - currently its a IVF with 1 cluster and no compression
index, err := faiss.IndexFactory(int(content.dim), "IVF1,Flat", metric)
if err != nil {
return 0, err
}

defer index.Close()

// the direct map maintained in the IVF index is essential for the
// reconstruction of vectors based on vector IDs in the future merges.
// the AddWithIDs API also needs a direct map to be set before using.
err = index.SetDirectMap(2)
if err != nil {
return 0, err
}

err = index.Train(vecs)
if err != nil {
return 0, err
}

index.AddWithIDs(vecs, ids)
err = index.AddWithIDs(vecs, ids)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -485,7 +475,7 @@ func (vo *vectorIndexOpaque) writeVectorIndexes(w *CountHashWriter) (offset uint
// an optimization to avoid using the bitmaps if there is only 1 doc
// with the vecID.
if numDocs == 1 {
n = binary.PutUvarint(tempBuf, numDocs)
n = binary.PutUvarint(tempBuf, uint64(docIDs.Minimum()))
_, err = w.Write(tempBuf[:n])
if err != nil {
return 0, err
Expand Down

0 comments on commit d292ef9

Please sign in to comment.