Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshil Goel committed Oct 18, 2023
1 parent 0ce037c commit a73116f
Show file tree
Hide file tree
Showing 9 changed files with 622 additions and 45 deletions.
132 changes: 123 additions & 9 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type countParams struct {
reverse bool
}

func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist Data,
hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) {
countBefore, countAfter := 0, 0
found := false
Expand Down Expand Up @@ -199,7 +199,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
key := x.ReverseKey(t.Attr, t.ValueId)
hasCountIndex := schema.State().HasCount(ctx, t.Attr)

var getFn func(key []byte) (*List, error)
var getFn func(key []byte) (Data, error)
if hasCountIndex {
// We need to retrieve the full posting list from disk, to allow us to get the length of the
// posting list for the counts.
Expand Down Expand Up @@ -267,6 +267,59 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
return nil
}

func (v *Value) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error {
isReversed := schema.State().IsReversed(ctx, edge.Attr)
isIndexed := schema.State().IsIndexed(ctx, edge.Attr)
hasCount := schema.State().HasCount(ctx, edge.Attr)
delEdge := &pb.DirectedEdge{
Attr: edge.Attr,
Op: edge.Op,
Entity: edge.Entity,
}
// To calculate length of posting list. Used for deletion of count index.
var plen int
err := v.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
plen++
switch {
case isReversed:
// Delete reverse edge for each posting.
delEdge.ValueId = p.Uid
return txn.addReverseAndCountMutation(ctx, delEdge)
case isIndexed:
// Delete index edge of each posting.
val := types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
return txn.addIndexMutations(ctx, &indexMutationInfo{
tokenizers: schema.State().Tokenizer(ctx, edge.Attr),
edge: edge,
val: val,
op: pb.DirectedEdge_DEL,
})
default:
return nil
}
})
if err != nil {
return err
}
if hasCount {
// Delete uid from count index. Deletion of reverses is taken care by addReverseMutation
// above.
if err := txn.updateCount(ctx, countParams{
attr: edge.Attr,
countBefore: plen,
countAfter: 0,
entity: edge.Entity,
}); err != nil {
return err
}
}

return v.addMutation(ctx, txn, edge)
}

func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error {
isReversed := schema.State().IsReversed(ctx, edge.Attr)
isIndexed := schema.State().IsIndexed(ctx, edge.Attr)
Expand Down Expand Up @@ -372,7 +425,7 @@ func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int
return countBefore
}

func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bool,
func (txn *Txn) addMutationHelper(ctx context.Context, l Data, doUpdateIndex bool,
hasCountIndex bool, t *pb.DirectedEdge) (types.Val, bool, countParams, error) {

t1 := time.Now()
Expand Down Expand Up @@ -454,6 +507,67 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
return val, found, emptyCountParams, nil
}

func (v *Value) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error {
if edge.Attr == "" {
return errors.Errorf("Predicate cannot be empty for edge with subject: [%v], object: [%v]"+
" and value: [%v]", edge.Entity, edge.ValueId, edge.Value)
}

if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star {
return v.handleDeleteAll(ctx, edge, txn)
}

doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr)
hasCountIndex := schema.State().HasCount(ctx, edge.Attr)

// Add reverse mutation irrespective of hasMutated, server crash can happen after
// mutation is synced and before reverse edge is synced
if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) {
if err := txn.addReverseAndCountMutation(ctx, edge); err != nil {
return err
}
}

val, found, cp, err := txn.addMutationHelper(ctx, v, doUpdateIndex, hasCountIndex, edge)
if err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))
if hasCountIndex && cp.countAfter != cp.countBefore {
if err := txn.updateCount(ctx, cp); err != nil {
return err
}
}
if doUpdateIndex {
// Exact matches.
if found && val.Value != nil {
if err := txn.addIndexMutations(ctx, &indexMutationInfo{
tokenizers: schema.State().Tokenizer(ctx, edge.Attr),
edge: edge,
val: val,
op: pb.DirectedEdge_DEL,
}); err != nil {
return err
}
}
if edge.Op == pb.DirectedEdge_SET {
val = types.Val{
Tid: types.TypeID(edge.ValueType),
Value: edge.Value,
}
if err := txn.addIndexMutations(ctx, &indexMutationInfo{
tokenizers: schema.State().Tokenizer(ctx, edge.Attr),
edge: edge,
val: val,
op: pb.DirectedEdge_SET,
}); err != nil {
return err
}
}
}
return nil
}

// AddMutationWithIndex is addMutation with support for indexing. It also
// supports reverse edges.
func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error {
Expand Down Expand Up @@ -551,7 +665,7 @@ type rebuilder struct {

// The posting list passed here is the on disk version. It is not coming
// from the LRU cache.
fn func(uid uint64, pl *List, txn *Txn) error
fn func(uid uint64, pl Data, txn *Txn) error
}

func (r *rebuilder) Run(ctx context.Context) error {
Expand Down Expand Up @@ -949,7 +1063,7 @@ func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error {

pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
builder.fn = func(uid uint64, pl Data, txn *Txn) error {
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
// Add index entries based on p.
Expand Down Expand Up @@ -1038,7 +1152,7 @@ func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error {

glog.Infof("Rebuilding count index for %s", rb.Attr)
var reverse bool
fn := func(uid uint64, pl *List, txn *Txn) error {
fn := func(uid uint64, pl Data, txn *Txn) error {
t := &pb.DirectedEdge{
ValueId: uid,
Attr: rb.Attr,
Expand Down Expand Up @@ -1132,7 +1246,7 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error {
glog.Infof("Rebuilding reverse index for %s", rb.Attr)
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
builder.fn = func(uid uint64, pl Data, txn *Txn) error {
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error {
puid := pp.Uid
Expand Down Expand Up @@ -1185,7 +1299,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
builder.fn = func(uid uint64, pl Data, txn *Txn) error {
var mpost *pb.Posting
err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
// We only want to modify the untagged value. There could be other values with a
Expand All @@ -1210,7 +1324,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// Ensure that list is in the cache run by txn. Otherwise, nothing would
// get updated.
pl = txn.cache.SetIfAbsent(string(pl.key), pl)
pl = txn.cache.SetIfAbsent(string(pl.Key()), pl)
if err := pl.addMutation(ctx, txn, t); err != nil {
return err
}
Expand Down
Loading

0 comments on commit a73116f

Please sign in to comment.