diff --git a/posting/index.go b/posting/index.go index e21d4b6c975..7748b3f3e69 100644 --- a/posting/index.go +++ b/posting/index.go @@ -142,7 +142,7 @@ type countParams struct { reverse bool } -func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, +func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist Data, hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) { countBefore, countAfter := 0, 0 found := false @@ -199,7 +199,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd key := x.ReverseKey(t.Attr, t.ValueId) hasCountIndex := schema.State().HasCount(ctx, t.Attr) - var getFn func(key []byte) (*List, error) + var getFn func(key []byte) (Data, error) if hasCountIndex { // We need to retrieve the full posting list from disk, to allow us to get the length of the // posting list for the counts. @@ -267,6 +267,59 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd return nil } +func (v *Value) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { + isReversed := schema.State().IsReversed(ctx, edge.Attr) + isIndexed := schema.State().IsIndexed(ctx, edge.Attr) + hasCount := schema.State().HasCount(ctx, edge.Attr) + delEdge := &pb.DirectedEdge{ + Attr: edge.Attr, + Op: edge.Op, + Entity: edge.Entity, + } + // To calculate length of posting list. Used for deletion of count index. + var plen int + err := v.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { + plen++ + switch { + case isReversed: + // Delete reverse edge for each posting. + delEdge.ValueId = p.Uid + return txn.addReverseAndCountMutation(ctx, delEdge) + case isIndexed: + // Delete index edge of each posting. + val := types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + } + return txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_DEL, + }) + default: + return nil + } + }) + if err != nil { + return err + } + if hasCount { + // Delete uid from count index. Deletion of reverses is taken care by addReverseMutation + // above. + if err := txn.updateCount(ctx, countParams{ + attr: edge.Attr, + countBefore: plen, + countAfter: 0, + entity: edge.Entity, + }); err != nil { + return err + } + } + + return v.addMutation(ctx, txn, edge) +} + func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { isReversed := schema.State().IsReversed(ctx, edge.Attr) isIndexed := schema.State().IsIndexed(ctx, edge.Attr) @@ -372,7 +425,7 @@ func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int return countBefore } -func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bool, +func (txn *Txn) addMutationHelper(ctx context.Context, l Data, doUpdateIndex bool, hasCountIndex bool, t *pb.DirectedEdge) (types.Val, bool, countParams, error) { t1 := time.Now() @@ -454,6 +507,67 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo return val, found, emptyCountParams, nil } +func (v *Value) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { + if edge.Attr == "" { + return errors.Errorf("Predicate cannot be empty for edge with subject: [%v], object: [%v]"+ + " and value: [%v]", edge.Entity, edge.ValueId, edge.Value) + } + + if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star { + return v.handleDeleteAll(ctx, edge, txn) + } + + doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr) + hasCountIndex := schema.State().HasCount(ctx, edge.Attr) + + // Add reverse mutation irrespective of hasMutated, server crash can happen after + // mutation is synced and before reverse edge is synced + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { + if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { + return err + } + } + + val, found, cp, err := txn.addMutationHelper(ctx, v, doUpdateIndex, hasCountIndex, edge) + if err != nil { + return err + } + ostats.Record(ctx, x.NumEdges.M(1)) + if hasCountIndex && cp.countAfter != cp.countBefore { + if err := txn.updateCount(ctx, cp); err != nil { + return err + } + } + if doUpdateIndex { + // Exact matches. + if found && val.Value != nil { + if err := txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_DEL, + }); err != nil { + return err + } + } + if edge.Op == pb.DirectedEdge_SET { + val = types.Val{ + Tid: types.TypeID(edge.ValueType), + Value: edge.Value, + } + if err := txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_SET, + }); err != nil { + return err + } + } + } + return nil +} + // AddMutationWithIndex is addMutation with support for indexing. It also // supports reverse edges. func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { @@ -551,7 +665,7 @@ type rebuilder struct { // The posting list passed here is the on disk version. It is not coming // from the LRU cache. - fn func(uid uint64, pl *List, txn *Txn) error + fn func(uid uint64, pl Data, txn *Txn) error } func (r *rebuilder) Run(ctx context.Context) error { @@ -949,7 +1063,7 @@ func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error { pk := x.ParsedKey{Attr: rb.Attr} builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} - builder.fn = func(uid uint64, pl *List, txn *Txn) error { + builder.fn = func(uid uint64, pl Data, txn *Txn) error { edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid} return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { // Add index entries based on p. @@ -1038,7 +1152,7 @@ func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { glog.Infof("Rebuilding count index for %s", rb.Attr) var reverse bool - fn := func(uid uint64, pl *List, txn *Txn) error { + fn := func(uid uint64, pl Data, txn *Txn) error { t := &pb.DirectedEdge{ ValueId: uid, Attr: rb.Attr, @@ -1132,7 +1246,7 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { glog.Infof("Rebuilding reverse index for %s", rb.Attr) pk := x.ParsedKey{Attr: rb.Attr} builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} - builder.fn = func(uid uint64, pl *List, txn *Txn) error { + builder.fn = func(uid uint64, pl Data, txn *Txn) error { edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid} return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error { puid := pp.Uid @@ -1185,7 +1299,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { pk := x.ParsedKey{Attr: rb.Attr} builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs} - builder.fn = func(uid uint64, pl *List, txn *Txn) error { + builder.fn = func(uid uint64, pl Data, txn *Txn) error { var mpost *pb.Posting err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { // We only want to modify the untagged value. There could be other values with a @@ -1210,7 +1324,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { // Ensure that list is in the cache run by txn. Otherwise, nothing would // get updated. - pl = txn.cache.SetIfAbsent(string(pl.key), pl) + pl = txn.cache.SetIfAbsent(string(pl.Key()), pl) if err := pl.addMutation(ctx, txn, t); err != nil { return err } diff --git a/posting/list.go b/posting/list.go index cba042d7126..5048b0e777e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "log" "math" "sort" @@ -68,6 +69,35 @@ const ( BitEmptyPosting byte = 0x10 ) +type Data interface { + Lock() + Unlock() + Key() []byte + AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error + ApproxLen() int + findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posting, err error) + addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error + getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) + addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error + Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error + IsEmpty(readTs, afterUid uint64) (bool, error) + Length(readTs, afterUid uint64) int + Uids(opt ListOptions) (*pb.List, error) + Postings(opt ListOptions, postFn func(*pb.Posting) error) error + AllUntaggedValues(readTs uint64) ([]types.Val, error) + AllValues(readTs uint64) ([]types.Val, error) + GetLangTags(readTs uint64) ([]string, error) + Value(readTs uint64) (rval types.Val, rerr error) + ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) + PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error) + ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error) + getMutation(startTs uint64) []byte + setMutation(startTs uint64, data []byte) + Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) + DeepSize() uint64 + maxVersion() uint64 +} + // List stores the in-memory representation of a posting list. type List struct { x.SafeMutex @@ -78,6 +108,12 @@ type List struct { maxTs uint64 // max commit timestamp seen for this list. } +type Value struct { + x.SafeMutex + key []byte + posting *pb.PostingList +} + // NewList returns a new list with an immutable layer set to plist and the // timestamp of the immutable layer set to minTs. func NewList(key []byte, plist *pb.PostingList, minTs uint64) *List { @@ -89,6 +125,20 @@ func NewList(key []byte, plist *pb.PostingList, minTs uint64) *List { } } +func (v *Value) Key() []byte { + return v.key +} + +func (l *List) Key() []byte { + return l.key +} + +func (v *Value) maxVersion() uint64 { + v.RLock() + defer v.RUnlock() + return v.posting.CommitTs +} + func (l *List) maxVersion() uint64 { l.RLock() defer l.RUnlock() @@ -325,6 +375,21 @@ func hasDeleteAll(mpost *pb.Posting) bool { return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0 } +func (v *Value) updateMutationLayer(mpost *pb.Posting) error { + v.AssertLock() + + if hasDeleteAll(mpost) { + plist := &pb.PostingList{} + v.posting = plist + return nil + } + + newPlist := &pb.PostingList{} + newPlist.Postings = append(newPlist.Postings, mpost) + v.posting = newPlist + return nil +} + // Ensure that you either abort the uncommitted postings or commit them before calling me. func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error { l.AssertLock() @@ -435,6 +500,12 @@ func fingerprintEdge(t *pb.DirectedEdge) uint64 { return id } +func (v *Value) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { + v.Lock() + defer v.Unlock() + return v.addMutationInternal(ctx, txn, t) +} + func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.Lock() defer l.Unlock() @@ -495,6 +566,48 @@ func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 { return conflictKey } +func (v *Value) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { + v.AssertLock() + + if txn.ShouldAbort() { + return x.ErrConflict + } + + mpost := NewPosting(t) + mpost.StartTs = txn.StartTs + if mpost.PostingType != pb.Posting_REF { + t.ValueId = fingerprintEdge(t) + mpost.Uid = t.ValueId + } + + // Check whether this mutation is an update for a predicate of type uid. + pk, err := x.Parse(v.key) + if err != nil { + return errors.Wrapf(err, "cannot parse key when adding mutation to list with key %s", + hex.EncodeToString(v.key)) + } + + pred, ok := schema.State().Get(ctx, t.Attr) + isSingle := ok && !pred.GetList() && pk.IsData() && !pred.GetLang() + if !isSingle { + return errors.New(fmt.Sprintf("Cannot update mutation layer of type %s using Value", t.Attr)) + + } + + if err != v.updateMutationLayer(mpost) { + return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v", + hex.EncodeToString(v.key), mpost) + } + + x.PrintMutationEdge(t, pk, txn.StartTs) + + // We ensure that commit marks are applied to posting lists in the right + // order. We can do so by proposing them in the same order as received by the Oracle delta + // stream from Zero, instead of in goroutines. + txn.addConflictKey(GetConflictKey(pk, v.key, t)) + return nil +} + func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.AssertLock() @@ -533,6 +646,27 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed return nil } +// getMutation returns a marshaled version of posting list mutation stored internally. +func (v *Value) getMutation(startTs uint64) []byte { + v.RLock() + defer v.RUnlock() + if v.posting != nil { + data, err := v.posting.Marshal() + x.Check(err) + return data + } + return nil +} + +func (v *Value) setMutation(startTs uint64, data []byte) { + pl := new(pb.PostingList) + x.Check(pl.Unmarshal(data)) + + v.Lock() + v.posting = pl + v.Unlock() +} + // getMutation returns a marshaled version of posting list mutation stored internally. func (l *List) getMutation(startTs uint64) []byte { l.RLock() @@ -569,6 +703,39 @@ func (l *List) setMutation(startTs uint64, data []byte) { // return nil // to continue iteration. // return errStopIteration // to break iteration. // }) + +func (v *Value) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { + if v.posting != nil { + for _, posting := range v.posting.Postings { + err := f(posting) + if err == ErrStopIteration { + return nil + } + if err != nil { + return err + } + } + } + return nil +} + +func (v *Value) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { + v.RLock() + defer v.RUnlock() + if v.posting != nil { + for _, posting := range v.posting.Postings { + err := f(posting) + if err == ErrStopIteration { + return nil + } + if err != nil { + return err + } + } + } + return nil +} + func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { l.RLock() defer l.RUnlock() @@ -736,6 +903,26 @@ loop: return err } +func (v *Value) IsEmpty(readTs, afterUid uint64) (bool, error) { + v.RLock() + defer v.RUnlock() + if v.posting == nil || len(v.posting.Postings) == 0 { + return true, nil + } + + return false, nil +} + +func (v *Value) Length(readTs, afterUid uint64) int { + v.RLock() + defer v.RUnlock() + if v.posting == nil || len(v.posting.Postings) == 0 { + return 0 + } + + return len(v.posting.Postings) +} + // IsEmpty returns true if there are no uids at the given timestamp after the given UID. func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { l.RLock() @@ -751,6 +938,26 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { return count == 0, nil } +func (v *Value) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { + v.AssertRLock() + var count int + var found bool + var post *pb.Posting + err := v.iterate(readTs, afterUid, func(p *pb.Posting) error { + if p.Uid == uid { + post = p + found = true + } + count++ + return nil + }) + if err != nil { + return -1, false, nil + } + + return count, found, post +} + func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { l.AssertRLock() var count int @@ -861,6 +1068,21 @@ func (l *List) Length(readTs, afterUid uint64) int { // This proves that writing rollups at ts + 1 would not cause any issues with dgraph.drop.op. // The only issue would come if a rollup happens at ts + k. If a backup happens in between // ts and ts + k, it could lead to some data being dropped during restore. +func (v *Value) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) { + v.RLock() + defer v.RUnlock() + kv := MarshalPostingList(v.posting, alloc) + var kvs []*bpb.KV + kv.Version = v.posting.CommitTs + if readTs != math.MaxUint64 { + kv.Version += 1 + } + + kv.Key = alloc.Copy(v.key) + kvs = append(kvs, kv) + return kvs, nil +} + func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) { l.RLock() defer l.RUnlock() @@ -1156,6 +1378,16 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { return out, nil } +func (v *Value) ApproxLen() int { + v.RLock() + defer v.RUnlock() + if v.posting == nil || len(v.posting.Postings) == 0 { + return 0 + } + + return len(v.posting.Postings) +} + // ApproxLen returns an approximate count of the UIDs in the posting list. func (l *List) ApproxLen() int { l.RLock() @@ -1163,6 +1395,43 @@ func (l *List) ApproxLen() int { return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack) } +func (v *Value) Uids(opt ListOptions) (*pb.List, error) { + if opt.First == 0 { + opt.First = math.MaxInt32 + } + // Pre-assign length to make it faster. + // Use approximate length for initial capacity. + res := make([]uint64, 0, v.Length(opt.ReadTs, opt.AfterUid)) + out := &pb.List{} + + err := v.Iterate(opt.ReadTs, opt.AfterUid, func(p *pb.Posting) error { + if p.PostingType == pb.Posting_REF { + res = append(res, p.Uid) + if opt.First < 0 { + // We need the last N. + // TODO: This could be optimized by only considering some of the last UidBlocks. + if len(res) > -opt.First { + res = res[1:] + } + } else if len(res) > opt.First { + return ErrStopIteration + } + } + return nil + }) + if err != nil { + return out, errors.Wrapf(err, "cannot retrieve UIDs from list with key %s", + hex.EncodeToString(v.key)) + } + + // Do The intersection here as it's optimized. + out.Uids = res + if opt.Intersect != nil { + algo.IntersectWith(out, opt.Intersect, out) + } + return out, nil +} + // Uids returns the UIDs given some query params. // We have to apply the filtering before applying (offset, count). // WARNING: Calling this function just to get UIDs is expensive @@ -1214,6 +1483,96 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) { return out, nil } +func (v *Value) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { + v.RLock() + defer v.RUnlock() + + err := v.iterate(opt.ReadTs, opt.AfterUid, func(p *pb.Posting) error { + if p.PostingType != pb.Posting_REF { + return nil + } + return postFn(p) + }) + return errors.Wrapf(err, "cannot retrieve postings from list with key %s", + hex.EncodeToString(v.key)) +} + +// AllUntaggedValues returns all the values in the posting list with no language tag. +func (v *Value) AllUntaggedValues(readTs uint64) ([]types.Val, error) { + v.RLock() + defer v.RUnlock() + + var vals []types.Val + err := v.iterate(readTs, 0, func(p *pb.Posting) error { + if len(p.LangTag) == 0 { + vals = append(vals, types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + }) + } + return nil + }) + return vals, errors.Wrapf(err, "cannot retrieve untagged values from list with key %s", + hex.EncodeToString(v.key)) +} + +// AllValues returns all the values in the posting list. +func (v *Value) AllValues(readTs uint64) ([]types.Val, error) { + v.RLock() + defer v.RUnlock() + + var vals []types.Val + err := v.iterate(readTs, 0, func(p *pb.Posting) error { + vals = append(vals, types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + }) + return nil + }) + return vals, errors.Wrapf(err, "cannot retrieve all values from list with key %s", + hex.EncodeToString(v.key)) +} + +// GetLangTags finds the language tags of each posting in the list. +func (v *Value) GetLangTags(readTs uint64) ([]string, error) { + return []string{}, errors.New("Can't give lang for value") +} + +// Value returns the default value from the posting list. The default value is +// defined as the value without a language tag. +func (v *Value) Value(readTs uint64) (rval types.Val, rerr error) { + v.RLock() + defer v.RUnlock() + val, found, err := v.findValue(readTs, math.MaxUint64) + if err != nil { + return val, errors.Wrapf(err, + "cannot retrieve default value from list with key %s", hex.EncodeToString(v.key)) + } + if !found { + return val, ErrNoValue + } + return val, nil +} + +// ValueFor returns a value from posting list, according to preferred language list. +// If list is empty, value without language is returned; if such value is not +// available, value with smallest UID is returned. +// If list consists of one or more languages, first available value is returned. +// If no language from the list matches the values, processing is the same as for empty list. +func (v *Value) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) { + return rval, errors.New("Can't give lang for value") +} + +// PostingFor returns the posting according to the preferred language list. +func (v *Value) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error) { + return nil, errors.New("Can't give lang for value") +} + +// ValueForTag returns the value in the posting list with the given language tag. +func (v *Value) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error) { + return rval, errors.New("Can't give tag for value") +} + // Postings calls postFn with the postings that are common with // UIDs in the opt ListOptions. func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error { @@ -1431,6 +1790,30 @@ func (l *List) postingForTag(readTs uint64, tag string) (p *pb.Posting, rerr err return p, nil } +func (v *Value) findValue(readTs, uid uint64) (rval types.Val, found bool, err error) { + v.AssertRLock() + found, p, err := v.findPosting(readTs, uid) + if !found { + return rval, found, err + } + + return valueToTypesVal(p), true, nil +} + +func (v *Value) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posting, err error) { + // Iterate starts iterating after the given argument, so we pass UID - 1 + err = v.iterate(readTs, uid-1, func(p *pb.Posting) error { + if p.Uid == uid { + pos = p + found = true + } + return ErrStopIteration + }) + + return found, pos, errors.Wrapf(err, + "cannot retrieve posting for UID %d from list with key %s", uid, hex.EncodeToString(v.key)) +} + func (l *List) findValue(readTs, uid uint64) (rval types.Val, found bool, err error) { l.AssertRLock() found, p, err := l.findPosting(readTs, uid) diff --git a/posting/lists.go b/posting/lists.go index af5fefbaa37..4833bd05111 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -27,6 +27,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgo/v230/protos/api" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto/z" @@ -59,7 +60,7 @@ func Init(ps *badger.DB, cacheSize int64) { BufferItems: 64, Metrics: true, Cost: func(val interface{}) int64 { - l, ok := val.(*List) + l, ok := val.(Data) if !ok { return int64(0) } @@ -89,7 +90,7 @@ func Cleanup() { // GetNoStore returns the list stored in the key or creates a new one if it doesn't exist. // It does not store the list in any cache. -func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) { +func GetNoStore(key []byte, readTs uint64) (rlist Data, err error) { return getNew(key, pstore, readTs) } @@ -110,7 +111,7 @@ type LocalCache struct { maxVersions map[string]uint64 // plists are posting lists in memory. They can be discarded to reclaim space. - plists map[string]*List + plists map[string]Data } // NewLocalCache returns a new LocalCache instance. @@ -118,7 +119,7 @@ func NewLocalCache(startTs uint64) *LocalCache { return &LocalCache{ startTs: startTs, deltas: make(map[string][]byte), - plists: make(map[string]*List), + plists: make(map[string]Data), maxVersions: make(map[string]uint64), } } @@ -129,7 +130,7 @@ func NoCache(startTs uint64) *LocalCache { return &LocalCache{startTs: startTs} } -func (lc *LocalCache) getNoStore(key string) *List { +func (lc *LocalCache) getNoStore(key string) Data { lc.RLock() defer lc.RUnlock() if l, ok := lc.plists[key]; ok { @@ -142,7 +143,7 @@ func (lc *LocalCache) getNoStore(key string) *List { // key already exists, the cache will not be modified and the existing list // will be returned instead. This behavior is meant to prevent the goroutines // using the cache from ending up with an orphaned version of a list. -func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { +func (lc *LocalCache) SetIfAbsent(key string, updated Data) Data { lc.Lock() defer lc.Unlock() if pl, ok := lc.plists[key]; ok { @@ -152,12 +153,25 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { return updated } -func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { - getNewPlistNil := func() (*List, error) { +func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (Data, error) { + pk, err := x.Parse(key) + if err != nil { + return nil, err + } + schemaType := schema.State().IsScalar(pk.Attr) + getFromDisk := func() (Data, error) { + if schemaType { + return getScalar(key, pstore, lc.startTs) + } else { + return getNew(key, pstore, lc.startTs) + } + } + + getNewPlistNil := func() (Data, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { - return getNew(key, pstore, lc.startTs) + return getFromDisk() } return nil, nil } @@ -171,20 +185,29 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return pl, nil } - var pl *List - if readFromDisk { - var err error - pl, err = getNew(key, pstore, lc.startTs) - if err != nil { - return nil, err - } - } else { - pl = &List{ - key: key, - plist: new(pb.PostingList), + getData := func() (Data, error) { + if readFromDisk { + return getFromDisk() + } else { + if schemaType { + return &Value{ + key: key, + posting: new(pb.PostingList), + }, nil + } else { + return &List{ + key: key, + plist: new(pb.PostingList), + }, nil + } } } + pl, err := getData() + if err != nil { + return pl, err + } + // If we just brought this posting list into memory and we already have a delta for it, let's // apply it before returning the list. lc.RLock() @@ -260,18 +283,18 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { } // Get retrieves the cached version of the list associated with the given key. -func (lc *LocalCache) Get(key []byte) (*List, error) { +func (lc *LocalCache) Get(key []byte) (Data, error) { return lc.getInternal(key, true) } // GetFromDelta gets the cached version of the list without reading from disk // and only applies the existing deltas. This is used in situations where the // posting list will only be modified and not read (e.g adding index mutations). -func (lc *LocalCache) GetFromDelta(key []byte) (*List, error) { +func (lc *LocalCache) GetFromDelta(key []byte) (Data, error) { return lc.getInternal(key, false) } -// UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists. +// UpdateDeltasAndDiscardDatas updates the delta cache before removing the stored posting lists. func (lc *LocalCache) UpdateDeltasAndDiscardLists() { lc.Lock() defer lc.Unlock() @@ -289,7 +312,7 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { // for the same transaction, who might be holding references to posting lists. // TODO: Find another way to reuse postings via postingPool. } - lc.plists = make(map[string]*List) + lc.plists = make(map[string]Data) } func (lc *LocalCache) fillPreds(ctx *api.TxnContext, gid uint32) { diff --git a/posting/mvcc.go b/posting/mvcc.go index d228ad10f0d..f871220ef0d 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,7 +457,30 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { +func getScalar(key []byte, pstore *badger.DB, readTs uint64) (Data, error) { + pl := &pb.PostingList{} + txn := pstore.NewTransactionAt(readTs, false) + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + + v := &Value{ + key: key, + posting: pl, + } + + return v, err +} + +func getNew(key []byte, pstore *badger.DB, readTs uint64) (Data, error) { cachedVal, ok := lCache.Get(key) if ok { l, ok := cachedVal.(*List) diff --git a/posting/oracle.go b/posting/oracle.go index 435a7853602..5923085c4cd 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -75,12 +75,12 @@ func NewTxn(startTs uint64) *Txn { } // Get retrieves the posting list for the given list from the local cache. -func (txn *Txn) Get(key []byte) (*List, error) { +func (txn *Txn) Get(key []byte) (Data, error) { return txn.cache.Get(key) } // GetFromDelta retrieves the posting list from delta cache, not from Badger. -func (txn *Txn) GetFromDelta(key []byte) (*List, error) { +func (txn *Txn) GetFromDelta(key []byte) (Data, error) { return txn.cache.GetFromDelta(key) } @@ -90,8 +90,8 @@ func (txn *Txn) Update() { } // Store is used by tests. -func (txn *Txn) Store(pl *List) *List { - return txn.cache.SetIfAbsent(string(pl.key), pl) +func (txn *Txn) Store(pl Data) Data { + return txn.cache.SetIfAbsent(string(pl.Key()), pl) } type oracle struct { diff --git a/posting/size.go b/posting/size.go index fd7cf7ca3d6..3f3641048ee 100644 --- a/posting/size.go +++ b/posting/size.go @@ -27,6 +27,30 @@ import ( const sizeOfBucket = 144 +func (v *Value) DeepSize() uint64 { + if v == nil { + return 0 + } + + v.RLock() + defer v.RUnlock() + + var size uint64 = 4*8 + // safe mutex consists of 4 words. + 1*8 + // plist pointer consists of 1 word. + 1*8 + // mutation map pointer consists of 1 word. + 2*8 + // minTs and maxTs take 1 word each. + 3*8 + // array take 3 words. so key array is 3 words. + 1*8 // So far 11 words, in order to round the slab we're adding one more word. + // so far basic struct layout has been calculated. + + // Add each entry size of key array. + size += uint64(cap(v.key)) + + // add the posting list size. + size += calculatePostingListSize(v.posting) + return size +} + // DeepSize computes the memory taken by a Posting List func (l *List) DeepSize() uint64 { if l == nil { diff --git a/schema/schema.go b/schema/schema.go index e04d67649aa..fd267e2a5c3 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -420,6 +420,16 @@ func (s *state) HasCount(ctx context.Context, pred string) bool { return false } +// IsList returns whether the predicate is of list type. +func (s *state) IsScalar(pred string) bool { + s.RLock() + defer s.RUnlock() + if schema, ok := s.predicate[pred]; ok { + return !schema.List && !schema.Lang + } + return false +} + // IsList returns whether the predicate is of list type. func (s *state) IsList(pred string) bool { s.RLock() diff --git a/worker/mutation.go b/worker/mutation.go index 25cdfb9da46..ee7f5d74fcb 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -92,7 +92,7 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e // if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can // just create a posting list facade in memory and use it to store the delta in Badger. Later, // the rollup operation would consolidate all these deltas into a posting list. - var getFn func(key []byte) (*posting.List, error) + var getFn func(key []byte) (posting.Data, error) switch { case len(su.GetTokenizer()) > 0 || su.GetCount(): // Any index or count index. diff --git a/worker/task.go b/worker/task.go index 6ea7a5a4b2b..8c5fb681926 100644 --- a/worker/task.go +++ b/worker/task.go @@ -568,7 +568,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er return nil } -func facetsFilterValuePostingList(args funcArgs, pl *posting.List, facetsTree *facetsTree, +func facetsFilterValuePostingList(args funcArgs, pl posting.Data, facetsTree *facetsTree, listType bool, fn func(p *pb.Posting)) error { q := args.q @@ -624,7 +624,7 @@ func facetsFilterValuePostingList(args funcArgs, pl *posting.List, facetsTree *f }) } -func countForValuePostings(args funcArgs, pl *posting.List, facetsTree *facetsTree, +func countForValuePostings(args funcArgs, pl posting.Data, facetsTree *facetsTree, listType bool) (int, error) { var filteredCount int err := facetsFilterValuePostingList(args, pl, facetsTree, listType, func(p *pb.Posting) { @@ -637,7 +637,7 @@ func countForValuePostings(args funcArgs, pl *posting.List, facetsTree *facetsTr return filteredCount, nil } -func retrieveValuesAndFacets(args funcArgs, pl *posting.List, facetsTree *facetsTree, +func retrieveValuesAndFacets(args funcArgs, pl posting.Data, facetsTree *facetsTree, listType bool) ([]types.Val, *pb.FacetsList, error) { q := args.q var vals []types.Val @@ -659,7 +659,7 @@ func retrieveValuesAndFacets(args funcArgs, pl *posting.List, facetsTree *facets return vals, &pb.FacetsList{FacetsList: fcs}, nil } -func facetsFilterUidPostingList(pl *posting.List, facetsTree *facetsTree, opts posting.ListOptions, +func facetsFilterUidPostingList(pl posting.Data, facetsTree *facetsTree, opts posting.ListOptions, fn func(*pb.Posting)) error { return pl.Postings(opts, func(p *pb.Posting) error { @@ -675,7 +675,7 @@ func facetsFilterUidPostingList(pl *posting.List, facetsTree *facetsTree, opts p }) } -func countForUidPostings(args funcArgs, pl *posting.List, facetsTree *facetsTree, +func countForUidPostings(args funcArgs, pl posting.Data, facetsTree *facetsTree, opts posting.ListOptions) (int, error) { var filteredCount int @@ -689,7 +689,7 @@ func countForUidPostings(args funcArgs, pl *posting.List, facetsTree *facetsTree return filteredCount, nil } -func retrieveUidsAndFacets(args funcArgs, pl *posting.List, facetsTree *facetsTree, +func retrieveUidsAndFacets(args funcArgs, pl posting.Data, facetsTree *facetsTree, opts posting.ListOptions) (*pb.List, []*pb.Facets, error) { q := args.q