Skip to content

Commit

Permalink
prune data and compact state (#4998)
Browse files Browse the repository at this point in the history
## Motivation
Closes #3049 
Closes #3588 

## Changes
- prune data consistently with tortoise hdist distance. 
  - proposals
  - certificates
  - proposal id<->tid mapping
  
  certificate is needed for consensus within hdist.
  the same distance is used for all for simplicity.

- extract active set data from ballots and save them in activesets table.
- vacuum and checkpoint database after migration 4 is complete

this PR concludes the first update described #4984 (comment)
  • Loading branch information
countvonzero committed Sep 19, 2023
1 parent dcd501d commit 06af3e2
Show file tree
Hide file tree
Showing 29 changed files with 595 additions and 74 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Support for old certificate sync protocol is dropped. This update is incompatibl
### Features

### Improvements
* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction.
Ephemeral data are deleted and state compacted at the time of upgrade. In steady-state, data is pruned periodically.
* [#5021](https://github.com/spacemeshos/go-spacemesh/pull/5021) Drop support for old certificate sync protocol.
* [#5024](https://github.com/spacemeshos/go-spacemesh/pull/5024) Active set will be saved in state separately from ballots.

Expand Down
15 changes: 0 additions & 15 deletions blocks/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,7 @@ func (g *Generator) Stop() {
}
}

func (g *Generator) pruneAsync() {
g.eg.Go(func() error {
lid := g.msh.ProcessedLayer()
start := time.Now()
if err := proposals.DeleteBefore(g.cdb, lid); err != nil {
g.logger.With().Error("failed to delete old proposals", lid, log.Err(err))
}
deleteLatency.Observe(time.Since(start).Seconds())
return nil
})
}

func (g *Generator) run() error {
g.pruneAsync()
var maxLayer types.LayerID
for {
select {
Expand Down Expand Up @@ -180,12 +167,10 @@ func (g *Generator) run() error {
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
g.pruneAsync()
case <-time.After(g.cfg.GenBlockInterval):
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
g.pruneAsync()
}
}
}
Expand Down
17 changes: 0 additions & 17 deletions blocks/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,7 @@ func Test_run(t *testing.T) {
txIDs := createAndSaveTxs(t, numTXs, tg.cdb)
signers, atxes := createATXs(t, tg.cdb, (layerID.GetEpoch() - 1).FirstLayer(), numProposals)
activeSet := types.ToATXIDs(atxes)
// generate some proposals before this layer
oldest := layerID - 10
for lid := oldest; lid < layerID; lid++ {
createProposals(t, tg.cdb, lid, types.EmptyLayerHash, signers, activeSet, txIDs)
}
plist := createProposals(t, tg.cdb, layerID, meshHash, signers, activeSet, txIDs)
for lid := oldest; lid <= layerID; lid++ {
got, err := proposals.GetByLayer(tg.cdb, lid)
require.NoError(t, err)
require.Len(t, got, len(signers))
}
pids := types.ToProposalIDs(plist)
tg.mockFetch.EXPECT().GetProposals(gomock.Any(), pids)

Expand Down Expand Up @@ -413,13 +403,6 @@ func Test_run(t *testing.T) {
tg.hareCh <- hare.LayerOutput{Ctx: context.Background(), Layer: layerID, Proposals: pids}
require.Eventually(t, func() bool { return len(tg.hareCh) == 0 }, time.Second, 100*time.Millisecond)
tg.Stop()
for lid := oldest; lid < layerID; lid++ {
_, err := proposals.GetByLayer(tg.cdb, lid)
require.ErrorIs(t, err, sql.ErrNotFound)
}
got, err := proposals.GetByLayer(tg.cdb, layerID)
require.NoError(t, err)
require.Len(t, got, len(signers))
})
}
}
Expand Down
8 changes: 0 additions & 8 deletions blocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ var (
failFetchCnt = blockGenCount.WithLabelValues(failFetch)
failGenCnt = blockGenCount.WithLabelValues(failGen)
failErrCnt = blockGenCount.WithLabelValues(internalErr)

deleteLatency = metrics.NewHistogramWithBuckets(
"delete_duration",
namespace,
"duration in second to delete old proposals",
[]string{},
prometheus.ExponentialBuckets(0.01, 2, 10),
).WithLabelValues()
)

type collector struct {
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func AddCommands(cmd *cobra.Command) {
cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests")
cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering",
cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query")
cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval",
cfg.DatabasePruneInterval, "configure interval for database pruning")

/** ======================== P2P Flags ========================== **/

Expand Down
34 changes: 18 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ type BaseConfig struct {
OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"`
TickSize uint64 `mapstructure:"tick-size"`

DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`

NetworkHRP string `mapstructure:"network-hrp"`

Expand Down Expand Up @@ -173,20 +174,21 @@ func DefaultTestConfig() Config {
// DefaultBaseConfig returns a default configuration for spacemesh.
func defaultBaseConfig() BaseConfig {
return BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
CollectMetrics: false,
MetricsPort: 1010,
ProfilerName: "gp-spacemesh",
LayerDuration: 30 * time.Second,
LayersPerEpoch: 3,
PoETServers: []string{"127.0.0.1"},
TxsPerProposal: 100,
BlockGasLimit: math.MaxUint64,
OptFilterThreshold: 90,
TickSize: 100,
DatabaseConnections: 16,
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
CollectMetrics: false,
MetricsPort: 1010,
ProfilerName: "gp-spacemesh",
LayerDuration: 30 * time.Second,
LayersPerEpoch: 3,
PoETServers: []string{"127.0.0.1"},
TxsPerProposal: 100,
BlockGasLimit: math.MaxUint64,
OptFilterThreshold: 90,
TickSize: 100,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",
}
}

Expand Down
11 changes: 6 additions & 5 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func MainnetConfig() Config {
logging.TrtlLoggerLevel = zapcore.WarnLevel.String()
return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",

LayerDuration: 5 * time.Minute,
LayerAvgSize: 50,
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func fastnet() config.Config {
conf.NetworkHRP = "stest"
types.SetNetworkHRP(conf.NetworkHRP) // set to generate coinbase
conf.BaseConfig.OptFilterThreshold = 90
conf.BaseConfig.DatabasePruneInterval = time.Minute

// set for systest TestEquivocation
conf.BaseConfig.MinerGoodAtxsPercent = 50
Expand Down
17 changes: 13 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/proposals"
"github.com/spacemeshos/go-spacemesh/prune"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/ballots/util"
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
Expand Down Expand Up @@ -638,11 +640,17 @@ func (app *App) initServices(ctx context.Context) error {
})

executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg))
msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg))
mlog := app.addLogger(MeshLogger, lg)
msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, mlog)
if err != nil {
return fmt.Errorf("failed to create mesh: %w", err)
}

app.eg.Go(func() error {
prune.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.Tortoise.Hdist, app.Config.DatabasePruneInterval)
return nil
})

fetcherWrapped := &layerFetcher{}
atxHandler := activation.NewHandler(
app.cachedDB,
Expand Down Expand Up @@ -1322,13 +1330,15 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) {
return edSgn, nil
}

func (app *App) setupDBs(ctx context.Context, lg log.Log, dbPath string) error {
func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
dbPath := app.Config.DataDir()
if err := os.MkdirAll(dbPath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create %s: %w", dbPath, err)
}
sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering),
sql.WithV4Migration(util.ExtractActiveSet),
)
if err != nil {
return fmt.Errorf("open sqlite db %w", err)
Expand Down Expand Up @@ -1407,7 +1417,6 @@ func (app *App) startSynchronous(ctx context.Context) (err error) {
}

if app.Config.ProfilerURL != "" {

app.profilerService, err = pyroscope.Start(pyroscope.Config{
ApplicationName: app.Config.ProfilerName,
// app.Config.ProfilerURL should be the pyroscope server address
Expand Down Expand Up @@ -1456,7 +1465,7 @@ func (app *App) startSynchronous(ctx context.Context) (err error) {
return fmt.Errorf("failed to initialize p2p host: %w", err)
}

if err := app.setupDBs(ctx, lg, app.Config.DataDir()); err != nil {
if err := app.setupDBs(ctx, lg); err != nil {
return err
}
if err := app.initServices(ctx); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions prune/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package prune

import (
"github.com/spacemeshos/go-spacemesh/common/types"
)

//go:generate mockgen -typed -package=prune -destination=./mocks.go -source=./interface.go

type layerClock interface {
CurrentLayer() types.LayerID
}
22 changes: 22 additions & 0 deletions prune/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package prune

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/spacemeshos/go-spacemesh/metrics"
)

const namespace = "prune"

var (
pruneLatency = metrics.NewHistogramWithBuckets(
"prune_seconds",
namespace,
"prune time in seconds",
[]string{"step"},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
proposalLatency = pruneLatency.WithLabelValues("proposal")
certLatency = pruneLatency.WithLabelValues("cert")
propTxLatency = pruneLatency.WithLabelValues("proptxs")
)
73 changes: 73 additions & 0 deletions prune/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 06af3e2

Please sign in to comment.