Skip to content

Commit

Permalink
sql: add an option to cache query results (#5465)
Browse files Browse the repository at this point in the history
With `{"main":{db-query-cache": true}}` option (false by default), some of the query results are stored in memory.

## Motivation
On the nodes with high peer counts, a lot of CPU time is spent on handling SQLite queries and also GC resulting from multiple database requests and same data being serialized over and over again.
With the query cache enabled on such nodes, the memory use doesn't increase much, as normally there's large number of requests "in flight" most of the time, and cached data are reused to handle these requests w/o further temporary allocations.

## Changes
Currently, ATX ID lists per epoch, ATX blobs and ActiveSet blobs are cached. Also, serialized ATX ID lists are cached by the fetcher.
This reduces CPU load and GC pressure on the nodes that serve a lot of fetch requests.

## Test Plan
Verified on a node with 8K+ peers
  • Loading branch information
ivan4th committed Feb 28, 2024
1 parent 9132687 commit 8bc6e5e
Show file tree
Hide file tree
Showing 16 changed files with 928 additions and 101 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ configuration is as follows:

* [#5418](https://github.com/spacemeshos/go-spacemesh/pull/5418) Add `grpc-post-listener` to separate post service from
`grpc-private-listener` and not require mTLS for the post service.
* [#5465](https://github.com/spacemeshos/go-spacemesh/pull/5465)
Add an option to cache SQL query results. This is useful for nodes with high peer counts.

If you are not using a remote post service you do not need to adjust anything. If you are using a remote setup
make sure your post service now connects to `grpc-post-listener` instead of `grpc-private-listener`. If you are
Expand Down Expand Up @@ -216,6 +218,7 @@ configuration is as follows:

### Improvements

>>>>>>> origin/develop
* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.
Expand Down
1 change: 1 addition & 0 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ func TestGetPositioningAtxDbFailed(t *testing.T) {
sig := maps.Values(tab.signers)[0]

db := datastoremocks.NewMockExecutor(tab.mctrl)
db.EXPECT().QueryCache().Return(sql.NullQueryCache)
tab.Builder.cdb = datastore.NewCachedDB(db, logtest.New(t))
expected := errors.New("db error")
db.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, expected)
Expand Down
31 changes: 22 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ type BaseConfig struct {
OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"`
TickSize uint64 `mapstructure:"tick-size"`

DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseSizeMeteringInterval time.Duration `mapstructure:"db-size-metering-interval"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`
DatabaseVacuumState int `mapstructure:"db-vacuum-state"`
DatabaseSkipMigrations []int `mapstructure:"db-skip-migrations"`
DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseSizeMeteringInterval time.Duration `mapstructure:"db-size-metering-interval"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`
DatabaseVacuumState int `mapstructure:"db-vacuum-state"`
DatabaseSkipMigrations []int `mapstructure:"db-skip-migrations"`
DatabaseQueryCache bool `mapstructure:"db-query-cache"`
DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"`

PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"`

Expand All @@ -141,6 +143,12 @@ type BaseConfig struct {
NoMainOverride bool `mapstructure:"no-main-override"`
}

type DatabaseQueryCacheSizes struct {
EpochATXs int `mapstructure:"epoch-atxs"`
ATXBlob int `mapstructure:"atx-blob"`
ActiveSetBlob int `mapstructure:"active-set-blob"`
}

type DeprecatedPoETServers struct{}

// DeprecatedMsg implements Deprecated interface.
Expand Down Expand Up @@ -222,9 +230,14 @@ func defaultBaseConfig() BaseConfig {
DatabaseConnections: 16,
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
DatabaseQueryCacheSizes: DatabaseQueryCacheSizes{
EpochATXs: 20,
ATXBlob: 10000,
ActiveSetBlob: 200,
},
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
}
}

Expand Down
38 changes: 38 additions & 0 deletions datastore/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type VrfNonceKey struct {
type Executor interface {
sql.Executor
WithTx(context.Context, func(*sql.Tx) error) error
QueryCache() sql.QueryCache
}

// CachedDB is simply a database injected with cache.
type CachedDB struct {
Executor
sql.QueryCache
logger log.Log

// cache is optional
Expand Down Expand Up @@ -108,6 +110,7 @@ func NewCachedDB(db Executor, lg log.Log, opts ...Opt) *CachedDB {

return &CachedDB{
Executor: db,
QueryCache: db.QueryCache(),
atxsdata: o.atxsdata,
logger: lg,
atxHdrCache: atxHdrCache,
Expand Down
103 changes: 64 additions & 39 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
)

const (
fetchSubKey sql.QueryCacheSubKey = "epoch-info-req"
)

type handler struct {
logger log.Log
cdb *datastore.CachedDB
Expand All @@ -38,12 +42,13 @@ func newHandler(
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
nodes, err := identities.GetMalicious(h.cdb)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get malicious IDs", log.Err(err))
h.logger.With().Warning("serve: failed to get malicious IDs",
log.Context(ctx), log.Err(err))
return nil, err
}
h.logger.WithContext(ctx).
With().
Debug("serve: responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
h.logger.With().
Debug("serve: responded to malicious IDs request",
log.Context(ctx), log.Int("num_malicious", len(nodes)))
malicious := &MaliciousIDs{
NodeIDs: nodes,
}
Expand All @@ -60,23 +65,27 @@ func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, e
if err := codec.Decode(msg, &epoch); err != nil {
return nil, err
}
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get epoch atx IDs", epoch, log.Err(err))
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.WithContext(ctx).With().Debug("serve: responded to epoch info request",
epoch,
log.Int("atx_count", len(ed.AtxIDs)),
)
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize epoch atx", epoch, log.Err(err))
}
return bts, nil

cacheKey := sql.QueryCacheKey(atxs.CacheKindEpochATXs, epoch.String())
return sql.WithCachedSubKey(h.cdb, cacheKey, fetchSubKey, func() ([]byte, error) {
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.With().Warning("serve: failed to get epoch atx IDs",
epoch, log.Err(err), log.Context(ctx))
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.With().Debug("serve: responded to epoch info request",
epoch, log.Context(ctx), log.Int("atx_count", len(ed.AtxIDs)))
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.With().Fatal("serve: failed to serialize epoch atx",
epoch, log.Context(ctx), log.Err(err))
}
return bts, nil
})
}

// handleLayerDataReq returns all data in a layer, described in LayerData.
Expand All @@ -91,13 +100,16 @@ func (h *handler) handleLayerDataReq(ctx context.Context, req []byte) ([]byte, e
}
ld.Ballots, err = ballots.IDsInLayer(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("serve: failed to get layer ballots", lid, log.Err(err))
h.logger.With().Warning("serve: failed to get layer ballots",
lid, log.Err(err), log.Context(ctx))
return nil, err
}

out, err := codec.Encode(&ld)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer data response", log.Err(err))
h.logger.With().Fatal(
"serve: failed to serialize layer data response",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -121,20 +133,22 @@ func (h *handler) handleLayerOpinionsReq2(ctx context.Context, data []byte) ([]b
opnReqV2.Inc()
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get prev agg hash", lid, log.Err(err))
h.logger.With().Error("serve: failed to get prev agg hash", log.Context(ctx), lid, log.Err(err))
return nil, err
}
bid, err := certificates.CertifiedBlock(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get layer certified block", lid, log.Err(err))
h.logger.With().Error("serve: failed to get layer certified block",
log.Context(ctx), lid, log.Err(err))
return nil, err
}
if err == nil {
lo.Certified = &bid
}
out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer opinions response", log.Err(err))
h.logger.With().Fatal("serve: failed to serialize layer opinions response",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -143,15 +157,16 @@ func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid type
certReq.Inc()
certs, err := certificates.Get(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get certificate", lid, log.Err(err))
h.logger.With().Error("serve: failed to get certificate", log.Context(ctx), lid, log.Err(err))
return nil, err
}
if err == nil {
for _, cert := range certs {
if cert.Block == bid {
out, err := codec.Encode(cert.Cert)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode cert", log.Err(err))
h.logger.With().Fatal("serve: failed to encode cert",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -163,7 +178,7 @@ func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid type
func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
if err := codec.Decode(data, &requestBatch); err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to parse request", log.Err(err))
h.logger.With().Warning("serve: failed to parse request", log.Context(ctx), log.Err(err))
return nil, errBadRequest
}
resBatch := ResponseBatch{
Expand All @@ -176,20 +191,23 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error
totalHashReqs.WithLabelValues(string(r.Hint)).Add(1)
res, err := h.bs.Get(r.Hint, r.Hash.Bytes())
if err != nil {
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested nonexistent hash",
h.logger.With().Debug("serve: remote peer requested nonexistent hash",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.String("hint", string(r.Hint)),
log.Err(err))
hashMissing.WithLabelValues(string(r.Hint)).Add(1)
continue
} else if res == nil {
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested golden",
h.logger.With().Debug("serve: remote peer requested golden",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
hashEmptyData.WithLabelValues(string(r.Hint)).Add(1)
continue
} else {
h.logger.WithContext(ctx).With().Debug("serve: responded to hash request",
h.logger.With().Debug("serve: responded to hash request",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
}
Expand All @@ -203,11 +221,13 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error

bts, err := codec.Encode(&resBatch)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode batch id",
h.logger.With().Fatal("serve: failed to encode batch id",
log.Context(ctx),
log.Err(err),
log.String("batch_hash", resBatch.ID.ShortString()))
}
h.logger.WithContext(ctx).With().Debug("serve: returning response for batch",
h.logger.With().Debug("serve: returning response for batch",
log.Context(ctx),
log.String("batch_hash", resBatch.ID.ShortString()),
log.Int("count_responses", len(resBatch.Responses)),
log.Int("data_size", len(bts)))
Expand All @@ -222,23 +242,28 @@ func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte
err error
)
if err = codec.Decode(reqData, &req); err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to parse mesh hash request", log.Err(err))
h.logger.With().Warning("serve: failed to parse mesh hash request",
log.Context(ctx), log.Err(err))
return nil, errBadRequest
}
if err := req.Validate(); err != nil {
h.logger.WithContext(ctx).With().Debug("failed to validate mesh hash request", log.Err(err))
h.logger.With().Debug("failed to validate mesh hash request",
log.Context(ctx), log.Err(err))
return nil, err
}
hashes, err = layers.GetAggHashes(h.cdb, req.From, req.To, req.Step)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get mesh hashes", log.Err(err))
h.logger.With().Warning("serve: failed to get mesh hashes",
log.Context(ctx), log.Err(err))
return nil, err
}
data, err = codec.EncodeSlice(hashes)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode hashes", log.Err(err))
h.logger.With().Fatal("serve: failed to encode hashes",
log.Context(ctx), log.Err(err))
}
h.logger.WithContext(ctx).With().Debug("serve: returning response for mesh hashes",
h.logger.With().Debug("serve: returning response for mesh hashes",
log.Context(ctx),
log.Stringer("layer_from", req.From),
log.Stringer("layer_to", req.To),
log.Uint32("by", req.Step),
Expand Down
Loading

0 comments on commit 8bc6e5e

Please sign in to comment.