Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(core): Fixing ristretto cache to pass jepsen tests #9237

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication.
Flag("percentage",
"Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+
"PstoreBlockCache,PstoreIndexCache)").
Flag("keep-updates",
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
"Should carry updates in cache or not (bool)").
String())

flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults).
Expand Down Expand Up @@ -633,6 +635,7 @@ func run() {
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")

cachePercentage := cache.GetString("percentage")
keepUpdates := cache.GetBool("keep-updates")
cachePercent, err := x.GetCachePercentages(cachePercentage, 3)
x.Check(err)
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
Expand All @@ -655,6 +658,7 @@ func run() {
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,
KeepUpdates: keepUpdates,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Expand Down Expand Up @@ -782,7 +786,7 @@ func run() {
// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
schema.Init(worker.State.Pstore)
posting.Init(worker.State.Pstore, postingListCacheSize)
posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates)
defer posting.Cleanup()
worker.Init(worker.State.Pstore)

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func run() {
db, err := badger.OpenManaged(bopts)
x.Check(err)
// Not using posting list cache
posting.Init(db, 0)
posting.Init(db, 0, false)
defer db.Close()

printSummary(db)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto/v2 v2.0.1
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/dgryski/go-groupvarint v0.0.0-20230630160417-2bfb7969fb3c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.2 h1:CnxXOKL4EPguKqcGV/z4u4VoW5izUkOTIsNM
github.com/dgraph-io/gqlparser/v2 v2.2.2/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto/v2 v2.0.1 h1:7W0LfEP+USCmtrUjJsk+Jv2jbhJmb72N4yRI7GrLdMI=
github.com/dgraph-io/ristretto/v2 v2.0.1/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo=
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470 h1:rVn0l0Hvab+/kCvTMFrUFtRtob+jagzgVVSfbyqCJ5E=
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
1 change: 1 addition & 0 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, err
}

memoryLayer.del(key)
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
return &bpb.KVList{Kv: kvs}, nil
}
tmpStream.Send = func(buf *z.Buffer) error {
Expand Down
3 changes: 3 additions & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
}

txn.Update()
txn.UpdateCachedKeys(commitTs)
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
Expand Down Expand Up @@ -271,6 +272,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,

func TestCountReverseIndexWithData(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.GalaxyAttr("testcount")
Expand Down Expand Up @@ -305,6 +307,7 @@ func TestCountReverseIndexWithData(t *testing.T) {

func TestCountReverseIndexEmptyPosting(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.GalaxyAttr("testcount")
Expand Down
18 changes: 17 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer {
}
}

func (mm *MutableLayer) setTs(readTs uint64) {
harshil-goel marked this conversation as resolved.
Show resolved Hide resolved
if mm == nil {
return
}
mm.readTs = readTs
}

// This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right
// things from the existing mutable layer for the new list. It basically copies committedEntries using reference and
// ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and
Expand Down Expand Up @@ -866,6 +873,12 @@ func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
return conflictKey
}

// SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed
// on to the functions that would read the data.
func (l *List) SetTs(readTs uint64) {
l.mutationMap.setTs(readTs)
}

func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error {
l.AssertLock()

Expand Down Expand Up @@ -978,6 +991,9 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
}

l.mutationMap.committedUids[mpost.Uid] = mpost
if l.mutationMap.length == math.MaxInt64 {
l.mutationMap.length = 0
}
l.mutationMap.length += getLengthDelta(mpost.Op)
}

Expand All @@ -999,7 +1015,6 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = newMutableLayer()
}
l.mutationMap.setCurrentEntries(startTs, pl)

if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
Expand Down Expand Up @@ -1258,6 +1273,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb
var count int
var found bool
var post *pb.Posting

err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
if p.Uid == uid {
post = p
Expand Down
6 changes: 5 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func TestAddMutation_mrjn1(t *testing.T) {
func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
require.Equal(t, nil, pstore.DropAll())

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
Expand All @@ -518,6 +519,7 @@ func TestReadSingleValue(t *testing.T) {
Value: []byte(fmt.Sprintf("ho hey there%d", i)),
}
txn := Txn{StartTs: i}
ol.mutationMap.setTs(i)
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(i, i+1))
kData := ol.getMutation(i + 1)
Expand All @@ -532,6 +534,8 @@ func TestReadSingleValue(t *testing.T) {
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
memoryLayer.del(key)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
Expand Down Expand Up @@ -1803,7 +1807,7 @@ func TestMain(m *testing.M) {
ps, err = badger.OpenManaged(badger.DefaultOptions(dir))
x.Panic(err)
// Not using posting list cache
Init(ps, 0)
Init(ps, 0, false)
schema.Init(ps)

m.Run()
Expand Down
43 changes: 6 additions & 37 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ package posting

import (
"bytes"
"context"
"fmt"
"sync"
"time"

ostats "go.opencensus.io/stats"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/ristretto/v2"
"github.com/dgraph-io/ristretto/v2/z"
"github.com/hypermodeinc/dgraph/v24/protos/pb"
"github.com/hypermodeinc/dgraph/v24/tok/index"
Expand All @@ -42,41 +38,15 @@ const (
var (
pstore *badger.DB
closer *z.Closer
lCache *ristretto.Cache[[]byte, *List]
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB, cacheSize int64) {
func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) {
pstore = ps
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
return
}

var err error
lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
Cost: func(val *List) int64 {
return 0
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
memoryLayer = initMemoryLayer(cacheSize, keepUpdates)
}

func UpdateMaxCost(maxCost int64) {
Expand Down Expand Up @@ -145,7 +115,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro
func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
value := pl.findStaticValue(vc.delegate.startTs)

if value == nil {
if value == nil || len(value.Postings) == 0 {
return nil, ErrNoValue
}

Expand Down Expand Up @@ -314,8 +284,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
}
} else {
pl = &List{
key: key,
plist: new(pb.PostingList),
key: key,
plist: new(pb.PostingList),
mutationMap: newMutableLayer(),
}
}

Expand Down Expand Up @@ -421,8 +392,6 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
}

for key, pl := range lc.plists {
//pk, _ := x.Parse([]byte(key))
//fmt.Printf("{TXN} Closing %v\n", pk)
data := pl.getMutation(lc.startTs)
if len(data) > 0 {
lc.deltas[key] = data
Expand Down
Loading
Loading