Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - prune data and compact state #4998

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Support for old certificate sync protocol is dropped. This update is incompatibl
### Features

### Improvements
* [#4998](https://github.com/spacemeshos/go-spacemesh/pull/4998) First phase of state size reduction.
Ephemeral data are deleted and state compacted at the time of upgrade. In steady-state, data is pruned periodically.
* [#5021](https://github.com/spacemeshos/go-spacemesh/pull/5021) Drop support for old certificate sync protocol.
* [#5024](https://github.com/spacemeshos/go-spacemesh/pull/5024) Active set will be saved in state separately from ballots.

Expand Down
15 changes: 0 additions & 15 deletions blocks/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,7 @@ func (g *Generator) Stop() {
}
}

func (g *Generator) pruneAsync() {
g.eg.Go(func() error {
lid := g.msh.ProcessedLayer()
start := time.Now()
if err := proposals.DeleteBefore(g.cdb, lid); err != nil {
g.logger.With().Error("failed to delete old proposals", lid, log.Err(err))
}
deleteLatency.Observe(time.Since(start).Seconds())
return nil
})
}

func (g *Generator) run() error {
g.pruneAsync()
var maxLayer types.LayerID
for {
select {
Expand Down Expand Up @@ -180,12 +167,10 @@ func (g *Generator) run() error {
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
g.pruneAsync()
case <-time.After(g.cfg.GenBlockInterval):
if len(g.optimisticOutput) > 0 {
g.processOptimisticLayers(maxLayer)
}
g.pruneAsync()
}
}
}
Expand Down
17 changes: 0 additions & 17 deletions blocks/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,7 @@ func Test_run(t *testing.T) {
txIDs := createAndSaveTxs(t, numTXs, tg.cdb)
signers, atxes := createATXs(t, tg.cdb, (layerID.GetEpoch() - 1).FirstLayer(), numProposals)
activeSet := types.ToATXIDs(atxes)
// generate some proposals before this layer
oldest := layerID - 10
for lid := oldest; lid < layerID; lid++ {
createProposals(t, tg.cdb, lid, types.EmptyLayerHash, signers, activeSet, txIDs)
}
plist := createProposals(t, tg.cdb, layerID, meshHash, signers, activeSet, txIDs)
for lid := oldest; lid <= layerID; lid++ {
got, err := proposals.GetByLayer(tg.cdb, lid)
require.NoError(t, err)
require.Len(t, got, len(signers))
}
pids := types.ToProposalIDs(plist)
tg.mockFetch.EXPECT().GetProposals(gomock.Any(), pids)

Expand Down Expand Up @@ -413,13 +403,6 @@ func Test_run(t *testing.T) {
tg.hareCh <- hare.LayerOutput{Ctx: context.Background(), Layer: layerID, Proposals: pids}
require.Eventually(t, func() bool { return len(tg.hareCh) == 0 }, time.Second, 100*time.Millisecond)
tg.Stop()
for lid := oldest; lid < layerID; lid++ {
_, err := proposals.GetByLayer(tg.cdb, lid)
require.ErrorIs(t, err, sql.ErrNotFound)
}
got, err := proposals.GetByLayer(tg.cdb, layerID)
require.NoError(t, err)
require.Len(t, got, len(signers))
})
}
}
Expand Down
8 changes: 0 additions & 8 deletions blocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ var (
failFetchCnt = blockGenCount.WithLabelValues(failFetch)
failGenCnt = blockGenCount.WithLabelValues(failGen)
failErrCnt = blockGenCount.WithLabelValues(internalErr)

deleteLatency = metrics.NewHistogramWithBuckets(
"delete_duration",
namespace,
"duration in second to delete old proposals",
[]string{},
prometheus.ExponentialBuckets(0.01, 2, 10),
).WithLabelValues()
)

type collector struct {
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func AddCommands(cmd *cobra.Command) {
cfg.DatabaseConnections, "configure number of active connections to enable parallel read requests")
cmd.PersistentFlags().BoolVar(&cfg.DatabaseLatencyMetering, "db-latency-metering",
cfg.DatabaseLatencyMetering, "if enabled collect latency histogram for every database query")
cmd.PersistentFlags().DurationVar(&cfg.DatabasePruneInterval, "db-prune-interval",
cfg.DatabasePruneInterval, "configure interval for database pruning")
countvonzero marked this conversation as resolved.
Show resolved Hide resolved

/** ======================== P2P Flags ========================== **/

Expand Down
34 changes: 18 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ type BaseConfig struct {
OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"`
TickSize uint64 `mapstructure:"tick-size"`

DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`

NetworkHRP string `mapstructure:"network-hrp"`

Expand Down Expand Up @@ -173,20 +174,21 @@ func DefaultTestConfig() Config {
// DefaultBaseConfig returns a default configuration for spacemesh.
func defaultBaseConfig() BaseConfig {
return BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
CollectMetrics: false,
MetricsPort: 1010,
ProfilerName: "gp-spacemesh",
LayerDuration: 30 * time.Second,
LayersPerEpoch: 3,
PoETServers: []string{"127.0.0.1"},
TxsPerProposal: 100,
BlockGasLimit: math.MaxUint64,
OptFilterThreshold: 90,
TickSize: 100,
DatabaseConnections: 16,
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
CollectMetrics: false,
MetricsPort: 1010,
ProfilerName: "gp-spacemesh",
LayerDuration: 30 * time.Second,
LayersPerEpoch: 3,
PoETServers: []string{"127.0.0.1"},
TxsPerProposal: 100,
BlockGasLimit: math.MaxUint64,
OptFilterThreshold: 90,
TickSize: 100,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",
}
}

Expand Down
11 changes: 6 additions & 5 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func MainnetConfig() Config {
logging.TrtlLoggerLevel = zapcore.WarnLevel.String()
return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",

LayerDuration: 5 * time.Minute,
LayerAvgSize: 50,
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func fastnet() config.Config {
conf.NetworkHRP = "stest"
types.SetNetworkHRP(conf.NetworkHRP) // set to generate coinbase
conf.BaseConfig.OptFilterThreshold = 90
conf.BaseConfig.DatabasePruneInterval = time.Minute

// set for systest TestEquivocation
conf.BaseConfig.MinerGoodAtxsPercent = 50
Expand Down
17 changes: 13 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/proposals"
"github.com/spacemeshos/go-spacemesh/prune"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/ballots/util"
"github.com/spacemeshos/go-spacemesh/sql/layers"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/syncer"
Expand Down Expand Up @@ -638,11 +640,17 @@ func (app *App) initServices(ctx context.Context) error {
})

executor := mesh.NewExecutor(app.cachedDB, state, app.conState, app.addLogger(ExecutorLogger, lg))
msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, app.addLogger(MeshLogger, lg))
mlog := app.addLogger(MeshLogger, lg)
msh, err := mesh.NewMesh(app.cachedDB, app.clock, trtl, executor, app.conState, mlog)
if err != nil {
return fmt.Errorf("failed to create mesh: %w", err)
}

app.eg.Go(func() error {
prune.Prune(ctx, mlog.Zap(), app.db, app.clock, app.Config.Tortoise.Hdist, app.Config.DatabasePruneInterval)
return nil
})

fetcherWrapped := &layerFetcher{}
atxHandler := activation.NewHandler(
app.cachedDB,
Expand Down Expand Up @@ -1322,13 +1330,15 @@ func (app *App) LoadOrCreateEdSigner() (*signing.EdSigner, error) {
return edSgn, nil
}

func (app *App) setupDBs(ctx context.Context, lg log.Log, dbPath string) error {
func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
dbPath := app.Config.DataDir()
if err := os.MkdirAll(dbPath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create %s: %w", dbPath, err)
}
sqlDB, err := sql.Open("file:"+filepath.Join(dbPath, dbFile),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering),
sql.WithV4Migration(util.ExtractActiveSet),
)
if err != nil {
return fmt.Errorf("open sqlite db %w", err)
Expand Down Expand Up @@ -1407,7 +1417,6 @@ func (app *App) startSynchronous(ctx context.Context) (err error) {
}

if app.Config.ProfilerURL != "" {

app.profilerService, err = pyroscope.Start(pyroscope.Config{
ApplicationName: app.Config.ProfilerName,
// app.Config.ProfilerURL should be the pyroscope server address
Expand Down Expand Up @@ -1456,7 +1465,7 @@ func (app *App) startSynchronous(ctx context.Context) (err error) {
return fmt.Errorf("failed to initialize p2p host: %w", err)
}

if err := app.setupDBs(ctx, lg, app.Config.DataDir()); err != nil {
if err := app.setupDBs(ctx, lg); err != nil {
return err
}
if err := app.initServices(ctx); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions prune/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package prune

import (
"github.com/spacemeshos/go-spacemesh/common/types"
)

//go:generate mockgen -typed -package=prune -destination=./mocks.go -source=./interface.go

type layerClock interface {
CurrentLayer() types.LayerID
}
22 changes: 22 additions & 0 deletions prune/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package prune

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/spacemeshos/go-spacemesh/metrics"
)

const namespace = "prune"

var (
pruneLatency = metrics.NewHistogramWithBuckets(
"prune_seconds",
namespace,
"prune time in seconds",
[]string{"step"},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
proposalLatency = pruneLatency.WithLabelValues("proposal")
certLatency = pruneLatency.WithLabelValues("cert")
propTxLatency = pruneLatency.WithLabelValues("proptxs")
)
73 changes: 73 additions & 0 deletions prune/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading