Skip to content

Commit

Permalink
added sharded map
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Jan 8, 2025
1 parent aad8b17 commit 16f36d3
Show file tree
Hide file tree
Showing 20 changed files with 407 additions and 190 deletions.
6 changes: 5 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication.
Flag("percentage",
"Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+
"PstoreBlockCache,PstoreIndexCache)").
Flag("keep-updates",
"Should carry updates in cache or not (bool)").
String())

flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults).
Expand Down Expand Up @@ -633,6 +635,7 @@ func run() {
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")

cachePercentage := cache.GetString("percentage")
keepUpdates := cache.GetBool("keep-updates")
cachePercent, err := x.GetCachePercentages(cachePercentage, 3)
x.Check(err)
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
Expand All @@ -655,6 +658,7 @@ func run() {
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,
KeepUpdates: keepUpdates,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Expand Down Expand Up @@ -782,7 +786,7 @@ func run() {
// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
schema.Init(worker.State.Pstore)
posting.Init(worker.State.Pstore, postingListCacheSize)
posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates)
defer posting.Cleanup()
worker.Init(worker.State.Pstore)

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func run() {
db, err := badger.OpenManaged(bopts)
x.Check(err)
// Not using posting list cache
posting.Init(db, 0)
posting.Init(db, 0, false)
defer db.Close()

printSummary(db)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto/v2 v2.0.1
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/dgryski/go-groupvarint v0.0.0-20230630160417-2bfb7969fb3c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.2 h1:CnxXOKL4EPguKqcGV/z4u4VoW5izUkOTIsNM
github.com/dgraph-io/gqlparser/v2 v2.2.2/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto/v2 v2.0.1 h1:7W0LfEP+USCmtrUjJsk+Jv2jbhJmb72N4yRI7GrLdMI=
github.com/dgraph-io/ristretto/v2 v2.0.1/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo=
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470 h1:rVn0l0Hvab+/kCvTMFrUFtRtob+jagzgVVSfbyqCJ5E=
github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
1 change: 1 addition & 0 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, err
}

memoryLayer.del(key)
return &bpb.KVList{Kv: kvs}, nil
}
tmpStream.Send = func(buf *z.Buffer) error {
Expand Down
3 changes: 3 additions & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
}

txn.Update()
txn.UpdateCachedKeys(commitTs)
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
Expand Down Expand Up @@ -271,6 +272,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,

func TestCountReverseIndexWithData(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.GalaxyAttr("testcount")
Expand Down Expand Up @@ -305,6 +307,7 @@ func TestCountReverseIndexWithData(t *testing.T) {

func TestCountReverseIndexEmptyPosting(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.GalaxyAttr("testcount")
Expand Down
18 changes: 17 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer {
}
}

func (mm *MutableLayer) setTs(readTs uint64) {
if mm == nil {
return
}
mm.readTs = readTs
}

// This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right
// things from the existing mutable layer for the new list. It basically copies committedEntries using reference and
// ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and
Expand Down Expand Up @@ -866,6 +873,12 @@ func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
return conflictKey
}

// SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed
// on to the functions that would read the data.
func (l *List) SetTs(readTs uint64) {
l.mutationMap.setTs(readTs)
}

func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error {
l.AssertLock()

Expand Down Expand Up @@ -978,6 +991,9 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
}

l.mutationMap.committedUids[mpost.Uid] = mpost
if l.mutationMap.length == math.MaxInt64 {
l.mutationMap.length = 0
}
l.mutationMap.length += getLengthDelta(mpost.Op)
}

Expand All @@ -999,7 +1015,6 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = newMutableLayer()
}
l.mutationMap.setCurrentEntries(startTs, pl)

if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
Expand Down Expand Up @@ -1258,6 +1273,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb
var count int
var found bool
var post *pb.Posting

err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
if p.Uid == uid {
post = p
Expand Down
6 changes: 5 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func TestAddMutation_mrjn1(t *testing.T) {
func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
require.Equal(t, nil, pstore.DropAll())

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
Expand All @@ -518,6 +519,7 @@ func TestReadSingleValue(t *testing.T) {
Value: []byte(fmt.Sprintf("ho hey there%d", i)),
}
txn := Txn{StartTs: i}
ol.mutationMap.setTs(i)
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(i, i+1))
kData := ol.getMutation(i + 1)
Expand All @@ -532,6 +534,8 @@ func TestReadSingleValue(t *testing.T) {
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
memoryLayer.del(key)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
Expand Down Expand Up @@ -1803,7 +1807,7 @@ func TestMain(m *testing.M) {
ps, err = badger.OpenManaged(badger.DefaultOptions(dir))
x.Panic(err)
// Not using posting list cache
Init(ps, 0)
Init(ps, 0, false)
schema.Init(ps)

m.Run()
Expand Down
43 changes: 6 additions & 37 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ package posting

import (
"bytes"
"context"
"fmt"
"sync"
"time"

ostats "go.opencensus.io/stats"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/ristretto/v2"
"github.com/dgraph-io/ristretto/v2/z"
"github.com/hypermodeinc/dgraph/v24/protos/pb"
"github.com/hypermodeinc/dgraph/v24/tok/index"
Expand All @@ -42,41 +38,15 @@ const (
var (
pstore *badger.DB
closer *z.Closer
lCache *ristretto.Cache[[]byte, *List]
)

// Init initializes the posting lists package, the in memory and dirty list hash.
func Init(ps *badger.DB, cacheSize int64) {
func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) {
pstore = ps
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
return
}

var err error
lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
Cost: func(val *List) int64 {
return 0
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
memoryLayer = initMemoryLayer(cacheSize, keepUpdates)
}

func UpdateMaxCost(maxCost int64) {
Expand Down Expand Up @@ -145,7 +115,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro
func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
value := pl.findStaticValue(vc.delegate.startTs)

if value == nil {
if value == nil || len(value.Postings) == 0 {
return nil, ErrNoValue
}

Expand Down Expand Up @@ -314,8 +284,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
}
} else {
pl = &List{
key: key,
plist: new(pb.PostingList),
key: key,
plist: new(pb.PostingList),
mutationMap: newMutableLayer(),
}
}

Expand Down Expand Up @@ -421,8 +392,6 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
}

for key, pl := range lc.plists {
//pk, _ := x.Parse([]byte(key))
//fmt.Printf("{TXN} Closing %v\n", pk)
data := pl.getMutation(lc.startTs)
if len(data) > 0 {
lc.deltas[key] = data
Expand Down
Loading

0 comments on commit 16f36d3

Please sign in to comment.