Skip to content

Commit

Permalink
updating merge path with the ivf index creations
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Oct 12, 2023
1 parent 14ded4b commit 70a5bed
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ require (
github.com/blevesearch/vellum v1.0.10
github.com/golang/snappy v0.0.1
github.com/spf13/cobra v1.4.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

require (
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.13.0 // indirect
)

replace github.com/blevesearch/bleve_index_api => ../bleve_index_api
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
3 changes: 3 additions & 0 deletions section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type invertedTextIndexSection struct {
}

func (i *invertedTextIndexSection) Process(opaque map[int]resetable, docNum uint64, field index.Field, fieldID uint16) {
if _, ok := field.(index.VectorField); ok {
return // skip vector fields
}
invIndexOpaque := i.getInvertedIndexOpaque(opaque)
invIndexOpaque.process(field, fieldID, docNum)
}
Expand Down
82 changes: 70 additions & 12 deletions section_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
faiss "github.com/blevesearch/go-faiss"
"golang.org/x/exp/maps"
)

func init() {
Expand Down Expand Up @@ -46,6 +47,7 @@ func (v *vectorIndexSection) AddrForField(opaque map[int]resetable, fieldID int)
type vecIndexMeta struct {
startOffset int
indexSize uint64
vecIds []int64
}

func remapDocIDs(oldIDs *roaring.Bitmap, newIDs []uint64) *roaring.Bitmap {
Expand All @@ -68,7 +70,7 @@ LOOP:
for fieldID, _ := range fieldsInv {

var indexes []vecIndexMeta
vecToDocID := make(map[uint64]*roaring.Bitmap)
vecToDocID := make(map[int64]*roaring.Bitmap)

// todo: would parallely fetching the following stuff from segments
// be beneficial in terms of perf?
Expand Down Expand Up @@ -101,6 +103,7 @@ LOOP:

numVecs, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
pos += n
indexes[len(indexes)-1].vecIds = make([]int64, 0, numVecs)

for i := 0; i < int(numVecs); i++ {
vecID, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
Expand All @@ -119,15 +122,17 @@ LOOP:
}

bitMap = remapDocIDs(bitMap, newDocNumsIn[segI])
if vecToDocID[vecID] == nil {
if vecToDocID[int64(vecID)] == nil {
if drops[segI] != nil && !drops[segI].IsEmpty() {
vecToDocID[vecID] = roaring.AndNot(bitMap, drops[segI])
vecToDocID[int64(vecID)] = roaring.AndNot(bitMap, drops[segI])
} else {
vecToDocID[vecID] = bitMap
vecToDocID[int64(vecID)] = bitMap
}
} else {
vecToDocID[vecID].Or(bitMap)
vecToDocID[int64(vecID)].Or(bitMap)
}

indexes[len(indexes)-1].vecIds = append(indexes[len(indexes)-1].vecIds, int64(vecID))
}
}
err := vo.mergeAndWriteVectorIndexes(fieldID, segments, vecToDocID, indexes, w, closeCh)
Expand All @@ -139,7 +144,7 @@ LOOP:
return nil
}

func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[uint64]*roaring.Bitmap,
func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[int64]*roaring.Bitmap,
serializedIndex []byte, w *CountHashWriter) (int, error) {
tempBuf := v.grabBuf(binary.MaxVarintLen64)
fieldStart := w.Count()
Expand Down Expand Up @@ -193,12 +198,7 @@ func (v *vectorIndexOpaque) flushVectorSection(vecToDocID map[uint64]*roaring.Bi
// todo: naive implementation. need to keep in mind the perf implications and improve on this.
// perhaps, parallelized merging can help speed things up over here.
func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*SegmentBase,
vecToDocID map[uint64]*roaring.Bitmap, indexes []vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {
if len(vecToDocID) >= 100000 {
// merging of more complex index types (for eg ivf family) with reconstruction
// method.
return fmt.Errorf("to be implemented")
}
vecToDocID map[int64]*roaring.Bitmap, indexes []vecIndexMeta, w *CountHashWriter, closeCh chan struct{}) error {

var vecIndexes []*faiss.IndexImpl
for segI, seg := range sbs {
Expand All @@ -211,6 +211,64 @@ func (v *vectorIndexOpaque) mergeAndWriteVectorIndexes(fieldID int, sbs []*Segme
vecIndexes = append(vecIndexes, index)
}

if len(vecToDocID) > 10000 {
// merging of more complex index types (for eg ivf family) with reconstruction
// method.
var indexData []float32
for i := 0; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
}
// todo: parallelize reconstruction
recons, err := vecIndexes[i].ReconstructBatch(int64(len(indexes[i].vecIds)), indexes[i].vecIds)
if err != nil {
return err
}
indexData = append(indexData, recons...)
}

// safe to assume that all the indexes are of the same config values, given
// that they are extracted from the field mapping info.
dims := vecIndexes[0].D()
metric := vecIndexes[0].MetricType()
finalVecIDs := maps.Keys(vecToDocID)

index, err := faiss.IndexFactory(dims, "IDMap2,IVF100,SQ8", metric)
if err != nil {
return err
}

index = index.AsIVF()
err = index.MakeDirectMap(2)
if err != nil {
return err
}
err = index.Train(indexData)
if err != nil {
return err
}

index.AddWithIDs(indexData, finalVecIDs)
if err != nil {
return err
}

mergedIndexBytes, err := faiss.WriteIndexIntoBuffer(index)
if err != nil {
return err
}

fieldStart, err := v.flushVectorSection(vecToDocID, mergedIndexBytes, w)
if err != nil {
return err
}
v.fieldAddrs[uint16(fieldID)] = fieldStart

return nil
}

// todo: ivf -> flat index when there were huge number of vector deletes for this field

for i := 1; i < len(vecIndexes); i++ {
if isClosed(closeCh) {
return fmt.Errorf("merging of vector sections aborted")
Expand Down
1 change: 0 additions & 1 deletion segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ func (s *SegmentBase) visitStoredFields(vdc *visitDocumentCtx, num uint64,
arrayPos[i] = ap
}
}

value := uncompressed[offset : offset+l]
keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos)
}
Expand Down

0 comments on commit 70a5bed

Please sign in to comment.