Skip to content

Commit

Permalink
Merge pull request #112 from ethpandaops/feat/safer-default-store-dur…
Browse files Browse the repository at this point in the history
…ations

feat: Refactor beacon blocks/state storage
  • Loading branch information
samcm committed May 30, 2023
2 parents ea4ce22 + 971f8e5 commit 7e43598
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/actions/checkpoint-sync/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ runs:
if: ${{ inputs.consensus == 'prysm' }}
run: |
echo "Starting prysm...";
docker run -d --name beacon --network eth -p 5052:5052 gcr.io/prysmaticlabs/prysm/beacon-chain:latest --datadir=/data --accept-terms-of-use --${{ inputs.network }} --clear-db --grpc-gateway-port=5052 --grpc-gateway-host=0.0.0.0 --http-web3provider=http://localhost:8545 --force-clear-db --checkpoint-sync-url=http://checkpointz:5555 --genesis-beacon-api-url=http://checkpointz:5555 --grpc-gateway-port=5052
docker run -d --name beacon --network eth -p 5052:5052 gcr.io/prysmaticlabs/prysm/beacon-chain:latest --datadir=/data --accept-terms-of-use --${{ inputs.network }} --clear-db --grpc-gateway-port=5052 --grpc-gateway-host=0.0.0.0 --execution-endpoint=http://localhost:8545 --force-clear-db --checkpoint-sync-url=http://checkpointz:5555 --genesis-beacon-api-url=http://checkpointz:5555 --grpc-gateway-port=5052
echo "Prysm is running.";
- name: Run nimbus
shell: bash
Expand Down
10 changes: 7 additions & 3 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ var (
topicFinalityHeadUpdated = "finality_head_updated"
)

const (
// FinalityHaltedServingPeriod defines how long we will happily serve finality data for after the chain has stopped finality.
// TODO(sam.calder-mason): Derive from weak subjectivity period.
FinalityHaltedServingPeriod = 14 * 24 * time.Hour
)

func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.Config, config *Config) FinalityProvider {
return &Default{
nodeConfigs: nodes,
Expand Down Expand Up @@ -515,9 +521,7 @@ func (d *Default) storeBlock(ctx context.Context, block *spec.VersionedSignedBea
return err
}

expiresAtSlot := CalculateSlotExpiration(slot, d.config.HistoricalEpochCount*int(d.spec.SlotsPerEpoch))
expiresAt := GetSlotTime(expiresAtSlot, d.spec.SecondsPerSlot.AsDuration(), d.genesis.GenesisTime).
Add(time.Minute * 22) // Store it for an extra 22 minutes to simplify the expiry logic.
expiresAt := time.Now().Add(FinalityHaltedServingPeriod)

if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
Expand Down
4 changes: 2 additions & 2 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
return nil, errors.New("beacon state is nil")
}

expiresAt := time.Now().Add(3 * time.Hour)
expiresAt := time.Now().Add(FinalityHaltedServingPeriod)
if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
}

if err := d.states.Add(stateRoot, &beaconState, expiresAt); err != nil {
if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil {
return nil, fmt.Errorf("failed to store beacon state: %w", err)
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/beacon/store/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ func (c *Block) Add(block *spec.VersionedSignedBeaconBlock, expiresAt time.Time)
return err
}

c.store.Add(eth.RootAsString(root), block, expiresAt)
invincible := false
if slot == 0 {
// Store the genesis block forever.
invincible = true
}

c.store.Add(eth.RootAsString(root), block, expiresAt, invincible)

c.slotToBlockRoot.Store(slot, root)
c.stateRootToBlockRoot.Store(stateRoot, root)
Expand Down
2 changes: 1 addition & 1 deletion pkg/beacon/store/deposit_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewDepositSnapshot(log logrus.FieldLogger, config Config, namespace string)
}

func (d *DepositSnapshot) Add(epoch phase0.Epoch, snapshot *types.DepositSnapshot, expiresAt time.Time) error {
d.store.Add(eth.EpochAsString(epoch), snapshot, expiresAt)
d.store.Add(eth.EpochAsString(epoch), snapshot, expiresAt, false)

d.log.WithFields(
logrus.Fields{
Expand Down
10 changes: 8 additions & 2 deletions pkg/beacon/store/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ func NewBeaconState(log logrus.FieldLogger, config Config, namespace string) *Be
return c
}

func (c *BeaconState) Add(stateRoot phase0.Root, state *[]byte, expiresAt time.Time) error {
c.store.Add(eth.RootAsString(stateRoot), state, expiresAt)
func (c *BeaconState) Add(stateRoot phase0.Root, state *[]byte, expiresAt time.Time, slot phase0.Slot) error {
invincible := false
if slot == 0 {
invincible = true
}

c.store.Add(eth.RootAsString(stateRoot), state, expiresAt, invincible)

c.log.WithFields(
logrus.Fields{
"state_root": eth.RootAsString(stateRoot),
Expand Down
21 changes: 16 additions & 5 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

type item struct {
value interface{}
expiresAt time.Time
value interface{}
expiresAt time.Time
invincible bool
}

type sortableItem struct {
Expand Down Expand Up @@ -39,6 +40,10 @@ func NewTTLMap(maxItems int, name, namespace string) (m *TTLMap) {
go func() {
for now := range time.Tick(time.Second * 1) {
for k, v := range m.m {
if v.invincible {
continue
}

if v.expiresAt.Before(now) {
m.Delete(k)
}
Expand Down Expand Up @@ -92,7 +97,12 @@ func (m *TTLMap) evictItemToClosestToExpiry() {
// This is a very naive implementation.
items := []sortableItem{}

// Get all non-invincible items.
for k, v := range m.m {
if v.invincible {
continue
}

items = append(items, sortableItem{
key: k,
expiresAt: v.expiresAt,
Expand All @@ -113,7 +123,7 @@ func (m *TTLMap) Len() int {
return len(m.m)
}

func (m *TTLMap) Add(k string, v interface{}, expiresAt time.Time) {
func (m *TTLMap) Add(k string, v interface{}, expiresAt time.Time, invincible bool) {
if m.Len() >= m.maxItems {
m.evictItemToClosestToExpiry()
}
Expand All @@ -125,8 +135,9 @@ func (m *TTLMap) Add(k string, v interface{}, expiresAt time.Time) {
it, ok := m.m[k]
if !ok {
it = &item{
value: v,
expiresAt: expiresAt,
value: v,
expiresAt: expiresAt,
invincible: invincible,
}
m.m[k] = it
}
Expand Down
47 changes: 41 additions & 6 deletions pkg/cache/ttl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestItemAdds(t *testing.T) {
key := "key1"
value := "value1"

instance.Add(key, value, time.Now().Add(time.Hour))
instance.Add(key, value, time.Now().Add(time.Hour), false)

data, _, err := instance.Get(key)
if err != nil {
Expand All @@ -30,7 +30,7 @@ func TestItemDeletes(t *testing.T) {
key := "key2"
value := "value2"

instance.Add(key, value, time.Now().Add(time.Hour))
instance.Add(key, value, time.Now().Add(time.Hour), false)
instance.Delete(key)

_, _, err := instance.Get(key)
Expand All @@ -45,7 +45,7 @@ func TestItemDoesExpire(t *testing.T) {
key := "key3"
value := "value3"

instance.Add(key, value, time.Now().Add(time.Second))
instance.Add(key, value, time.Now().Add(time.Second), false)

time.Sleep(time.Second * 3)

Expand All @@ -58,7 +58,7 @@ func TestItemDoesExpire(t *testing.T) {
func TestMaxItems(t *testing.T) {
instance := NewTTLMap(3, "", "")
for i := 1; i <= 10; i++ {
instance.Add(fmt.Sprintf("key%d", i), "value", time.Now().Add(time.Hour))
instance.Add(fmt.Sprintf("key%d", i), "value", time.Now().Add(time.Hour), false)
}

if len(instance.m) != 3 {
Expand All @@ -69,7 +69,7 @@ func TestMaxItems(t *testing.T) {
func TestMaxItemsEvictsOldest(t *testing.T) {
instance := NewTTLMap(3, "", "")
for i := 1; i <= 10; i++ {
instance.Add(fmt.Sprintf("key%d", i), "value", time.Now().Add(time.Hour).Add(time.Second*time.Duration(i)))
instance.Add(fmt.Sprintf("key%d", i), "value", time.Now().Add(time.Hour).Add(time.Second*time.Duration(i)), false)
}

if _, _, err := instance.Get("key1"); err == nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestCallbacks(t *testing.T) {
addedCallback = true
})

instance.Add("key4", "value4", time.Now().Add(time.Hour))
instance.Add("key4", "value4", time.Now().Add(time.Hour), false)
instance.Delete("key4")

time.Sleep(time.Second * 1)
Expand All @@ -137,3 +137,38 @@ func TestCallbacks(t *testing.T) {
t.Fatalf("Expected added callback to have been called")
}
}

func TestInvincible(t *testing.T) {
ttlMap := NewTTLMap(2, "myCache", "default")

now := time.Now()

// add an item with 1 second expiry
ttlMap.Add("key1", "value1", now.Add(time.Second), false)

// add an item with 2 second expiry and invincible
ttlMap.Add("key2", "value2", now.Add(time.Second*2), true)

// add an item with 3 second expiry
ttlMap.Add("key3", "value3", now.Add(time.Second*3), false)

time.Sleep(5 * time.Second)

// get key2, should be present as it is invincible
_, _, err := ttlMap.Get("key2")
if err != nil {
t.Error("key2 not found")
}

// get key1, should not be found as it has expired
_, _, err = ttlMap.Get("key1")
if err == nil {
t.Error("key1 should not be found")
}

// get key2, should be present as it is invincible
_, _, err = ttlMap.Get("key3")
if err == nil {
t.Error("key2 should not be found")
}
}

0 comments on commit 7e43598

Please sign in to comment.