diff --git a/posting/list_test.go b/posting/list_test.go index 9911dc60b45..eefa2662848 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -480,7 +480,6 @@ func TestReadSingleValue(t *testing.T) { require.NoError(t, err) checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) } - } } diff --git a/posting/lists.go b/posting/lists.go index 3d569766d7d..af5fefbaa37 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -195,24 +195,35 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +// GetSinglePosting retrieves the cached version of the first item in the list associated with the +// given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + getDeltas := func() *pb.PostingList { + lc.RLock() + defer lc.RUnlock() - getPostings := func() (*pb.PostingList, error) { pl := &pb.PostingList{} - lc.RLock() if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { err := pl.Unmarshal(delta) if err != nil { - lc.RUnlock() - return pl, nil + return pl } } - lc.RUnlock() + return nil + } + + getPostings := func() (*pb.PostingList, error) { + pl := getDeltas() + if pl != nil { + return pl, nil + } + + pl = &pb.PostingList{} txn := pstore.NewTransactionAt(lc.startTs, false) item, err := txn.Get(key) if err != nil { - return pl, err + return nil, err } err = item.Value(func(val []byte) error { @@ -227,10 +238,10 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { pl, err := getPostings() if err == badger.ErrKeyNotFound { - err = nil + return nil, nil } if err != nil { - return pl, err + return nil, err } // Filter and remove STAR_ALL and OP_DELETE Postings diff --git a/worker/task.go b/worker/task.go index 94d8dffb73f..6ea7a5a4b2b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -376,9 +376,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) + + // These are certain special cases where we can get away with reading only the latest value + // Lang doesn't work because we would be storing various different languages at various + // time. So when we go to read the latest value, we might get a different language. + // Similarly with DoCount and ExpandAll and Facets. List types are also not supported + // because list is stored by time, and we combine all the list items at various timestamps. hasLang := schema.State().HasLang(q.Attr) - getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang - //getMultiplePosting := true + getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil calculate := func(start, end int) error { x.AssertTrue(start%width == 0) @@ -399,7 +404,10 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !getMultiplePosting { - pl, _ := qs.cache.GetSinglePosting(key) + pl, err := qs.cache.GetSinglePosting(key) + if err != nil { + return err + } if pl == nil || len(pl.Postings) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) @@ -413,11 +421,6 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er Tid: types.TypeID(p.ValType), Value: p.Value, } - - // TODO Apply facet tree before - if q.FacetParam != nil { - fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } } } else { pl, err := qs.cache.Get(key)