Skip to content

Commit

Permalink
added config
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Jan 6, 2025
1 parent 833ad4a commit 2adb7e6
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 31 deletions.
6 changes: 5 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication.
Flag("percentage",
"Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+
"PstoreBlockCache,PstoreIndexCache)").
Flag("keep-updates",
"Should carry updates in cache or not (bool)").
String())

flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults).
Expand Down Expand Up @@ -633,6 +635,7 @@ func run() {
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")

cachePercentage := cache.GetString("percentage")
keepUpdates := cache.GetBool("keep-updates")
cachePercent, err := x.GetCachePercentages(cachePercentage, 3)
x.Check(err)
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
Expand All @@ -655,6 +658,7 @@ func run() {
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,
KeepUpdates: keepUpdates,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Expand Down Expand Up @@ -782,7 +786,7 @@ func run() {
// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
schema.Init(worker.State.Pstore)
posting.Init(worker.State.Pstore, postingListCacheSize)
posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates)
defer posting.Cleanup()
worker.Init(worker.State.Pstore)

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func run() {
db, err := badger.OpenManaged(bopts)
x.Check(err)
// Not using posting list cache
posting.Init(db, 0)
posting.Init(db, 0, false)
defer db.Close()

printSummary(db)
Expand Down
7 changes: 7 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer {
}
}

func (mm *MutableLayer) setTs(readTs uint64) {
if mm == nil {
return
}
mm.readTs = readTs
}

// This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right
// things from the existing mutable layer for the new list. It basically copies committedEntries using reference and
// ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and
Expand Down
2 changes: 1 addition & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ func TestMain(m *testing.M) {
ps, err = badger.OpenManaged(badger.DefaultOptions(dir))
x.Panic(err)
// Not using posting list cache
Init(ps, 0)
Init(ps, 0, false)
schema.Init(ps)

m.Run()
Expand Down
4 changes: 2 additions & 2 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB, cacheSize int64) {
func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) {
pstore = ps
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)
Expand All @@ -52,7 +52,7 @@ func Init(ps *badger.DB, cacheSize int64) {
return
}

memoryLayer = initMemoryLayer(cacheSize)
memoryLayer = initMemoryLayer(cacheSize, keepUpdates)
}

func UpdateMaxCost(maxCost int64) {
Expand Down
55 changes: 36 additions & 19 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,17 @@ func RemoveCacheFor(key []byte) {
type MemoryLayer struct {
cache *ristretto.Cache[[]byte, *CachePL]

keepUpdates bool

numCacheRead int
numCacheReadFails int
numDisksRead int
numCacheSave int
}

func initMemoryLayer(cacheSize int64) *MemoryLayer {
sm := &MemoryLayer{}
func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer {
ml := &MemoryLayer{}
ml.keepUpdates = keepUpdates
cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
Expand All @@ -382,12 +385,17 @@ func initMemoryLayer(cacheSize int64) *MemoryLayer {
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
sm.cache = cache
return sm
if cacehSize > 0 {

Check failure on line 388 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cacehSize) (typecheck)

Check failure on line 388 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cacehSize) (typecheck)

Check failure on line 388 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cacehSize) (typecheck)
ml.cache = cache
}
return ml
}

func (sm *MemoryLayer) get(key []byte) (*CachePL, bool) {
val, ok := sm.cache.Get(key)
func (ml *MemoryLayer) get(key []byte) (*CachePL, bool) {
if ml.cache == nil {
return nil, false
}
val, ok := ml.cache.Get(key)
if !ok {
return val, ok
}
Expand All @@ -397,16 +405,25 @@ func (sm *MemoryLayer) get(key []byte) (*CachePL, bool) {
return val, true
}

func (sm *MemoryLayer) set(key []byte, i *CachePL) {
sm.cache.Set(key, i, 1)
func (ml *MemoryLayer) set(key []byte, i *CachePL) {
if ml.cache == nil {
return
}
ml.cache.Set(key, i, 1)
}

func (sm *MemoryLayer) del(key []byte) {
sm.cache.Del(key)
func (ml *MemoryLayer) del(key []byte) {
if ml.cache == nil {
return
}
ml.cache.Del(key)
}

func (sm *MemoryLayer) clear() {
sm.cache.Clear()
func (ml *MemoryLayer) clear() {
if ml.cache == nil {
return
}
ml.cache.Clear()
}

func NewCachePL() *CachePL {
Expand Down Expand Up @@ -434,9 +451,8 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm
return
}

updateItemAfterCommit := false

if !updateItemAfterCommit {
if !ml.keepUpdates {
// TODO We should mark the key as deleted instead of directly deleting from the cache.
ml.del([]byte(key))
return
}
Expand All @@ -449,7 +465,7 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm
val.lastUpdate = commitTs
val.count -= 1

if val.list != nil && updateItemAfterCommit {
if val.list != nil && ml.keepUpdates {
p := new(pb.PostingList)
x.Check(proto.Unmarshal(delta, p))

Expand Down Expand Up @@ -551,9 +567,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
return l, nil
case BitDeltaPosting:
err := item.Value(func(val []byte) error {
if l.mutationMap == nil {
l.mutationMap = newMutableLayer()
}
pl := &pb.PostingList{}
if err := proto.Unmarshal(val, pl); err != nil {
return err
Expand Down Expand Up @@ -640,6 +653,7 @@ func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64
return l, nil
}

// Saves the data in the cache. The caller must ensure that the list provided is the latest possible.
func (ml *MemoryLayer) saveInCache(key []byte, l *List) {
l.RLock()
defer l.RUnlock()
Expand All @@ -657,6 +671,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
l := ml.readFromCache(key, readTs)
if l != nil {
ml.numCacheRead += 1
l.mutationMap.setTs(readTs)
return l, nil
} else {
ml.numCacheReadFails += 1
Expand All @@ -667,6 +682,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
}
ml.saveInCache(key, l)
if l.minTs == 0 || readTs >= l.minTs {
l.mutationMap.setTs(readTs)
return l, nil
}

Expand All @@ -675,6 +691,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
return nil, err
}

l.mutationMap.setTs(readTs)
return l, nil
}

Expand Down
2 changes: 1 addition & 1 deletion posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func BenchmarkTestCache(b *testing.B) {

ps, err = badger.OpenManaged(badger.DefaultOptions(dir))
x.Panic(err)
Init(ps, 10000000)
Init(ps, 10000000, true)
schema.Init(ps)

attr := x.GalaxyAttr("cache")
Expand Down
5 changes: 5 additions & 0 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type Options struct {
CachePercentage string
// CacheMb is the total memory allocated between all the caches.
CacheMb int64
// KeepUpdates is the parameter that allows the user to set if the cache should keep the items that were
// just mutated. Keeping these items are good when there is a mixed workload where you are updating the
// same element multiple times. However, for a heavy mutation workload, not keeping these items would be better
// , as keeping these elements bloats the cache making it slow.
KeepUpdates bool

Audit *x.LoggerConf

Expand Down
2 changes: 1 addition & 1 deletion worker/mutation_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReverseEdge(t *testing.T) {
x.Check(err)
pstore = ps
// Not using posting list cache
posting.Init(ps, 0)
posting.Init(ps, 0, false)
Init(ps)
err = schema.ParseBytes([]byte("revc: [uid] @reverse @count ."), 1)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions worker/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSingleUid(t *testing.T) {
ps, err := badger.OpenManaged(opt)
x.Check(err)
pstore = ps
posting.Init(ps, 0)
posting.Init(ps, 0, false)
Init(ps)
err = schema.ParseBytes([]byte("singleUidTest: string @index(exact) @unique ."), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestLangExact(t *testing.T) {
x.Check(err)
pstore = ps
// Not using posting list cache
posting.Init(ps, 0)
posting.Init(ps, 0, false)
Init(ps)
err = schema.ParseBytes([]byte("testLang: string @index(term) @lang ."), 1)
require.NoError(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestLangExact(t *testing.T) {

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func RandStringBytes(n int) string {
func randStringBytes(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
Expand All @@ -236,7 +236,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) {
x.Check(err)
pstore = ps
// Not using posting list cache
posting.Init(ps, 0)
posting.Init(ps, 0, false)
Init(ps)
err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1)
fmt.Println(err)
Expand All @@ -250,7 +250,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) {
n := uint64(1000)
values := make([]string, 0)
for range n {
values = append(values, RandStringBytes(5))
values = append(values, randStringBytes(5))
}

for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 2adb7e6

Please sign in to comment.