Skip to content

Commit

Permalink
aggregation-duty: clarify and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Oct 25, 2024
1 parent 3de919b commit 0fcb09f
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 129 deletions.
31 changes: 0 additions & 31 deletions beacon/goclient/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package goclient

import (
"encoding/binary"
"fmt"
"time"

Expand All @@ -18,15 +17,6 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate
gc.waitToSlotTwoThirds(slot)

// differ from spec because we need to subscribe to subnet
isAggregator, err := isAggregator(committeeLength, slotSig)
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to check if validator is an aggregator: %w", err)
}
if !isAggregator {
return nil, DataVersionNil, fmt.Errorf("validator is not an aggregator")
}

attData, _, err := gc.GetAttestationData(slot, committeeIndex)
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to get attestation data: %w", err)
Expand Down Expand Up @@ -73,27 +63,6 @@ func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggreg
return gc.client.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg})
}

// IsAggregator returns true if the signature is from the input validator. The committee
// count is provided as an argument rather than imported implementation from spec. Having
// committee count as an argument allows cheaper computation at run time.
//
// Spec pseudocode definition:
//
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
func isAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
modulo := committeeCount / TargetAggregatorsPerCommittee
if modulo == 0 {
// Modulo must be at least 1.
modulo = 1
}

b := Hash(slotSig)
return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil
}

// waitToSlotTwoThirds waits until two-third of the slot has transpired (SECONDS_PER_SLOT * 2 / 3 seconds after the start of slot)
func (gc *GoClient) waitToSlotTwoThirds(slot phase0.Slot) {
oneThird := gc.network.SlotDurationSec() / 3 /* one third of slot duration */
Expand Down
7 changes: 3 additions & 4 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging/fields"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/slotticker"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
"github.com/ssvlabs/ssv/utils/casts"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -246,10 +245,10 @@ func (gc *GoClient) Healthy(ctx context.Context) error {
// TODO: also check if syncState.ElOffline when github.com/attestantio/go-eth2-client supports it
metricsBeaconNodeStatus.Set(float64(statusSyncing))
if syncState.IsSyncing {
return fmt.Errorf("syncing")
return fmt.Errorf("node is syncing")
}
if syncState.IsOptimistic {
return fmt.Errorf("optimistic")
return fmt.Errorf("node is optimistic")
}

metricsBeaconNodeStatus.Set(float64(statusOK))
Expand Down
30 changes: 0 additions & 30 deletions beacon/goclient/signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package goclient

import (
"context"
"crypto/sha256"
"fmt"
"hash"
"sync"

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -132,33 +129,6 @@ func (gc *GoClient) signingData(rootFunc func() ([32]byte, error), domain []byte
return container.HashTreeRoot()
}

var sha256Pool = sync.Pool{New: func() interface{} {
return sha256.New()
}}

// Hash defines a function that returns the sha256 checksum of the data passed in.
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/core/0_beacon-chain.md#hash
func Hash(data []byte) [32]byte {
h, ok := sha256Pool.Get().(hash.Hash)
if !ok {
h = sha256.New()
}
defer sha256Pool.Put(h)
h.Reset()

var b [32]byte

// The hash interface never returns an error, for that reason
// we are not handling the error below. For reference, it is
// stated here https://golang.org/pkg/hash/#Hash

// #nosec G104
h.Write(data)
h.Sum(b[:0])

return b
}

// this returns the 32byte fork data root for the “current_version“ and “genesis_validators_root“.
// This is used primarily in signature domains to avoid collisions across forks/chains.
//
Expand Down
2 changes: 0 additions & 2 deletions beacon/goclient/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ var (
SyncCommitteeSize uint64 = 512
SyncCommitteeSubnetCount uint64 = 4
TargetAggregatorsPerSyncSubcommittee uint64 = 16
EpochsPerSyncCommitteePeriod uint64 = 256
TargetAggregatorsPerCommittee uint64 = 16
FarFutureEpoch phase0.Epoch = 1<<64 - 1
IntervalsPerSlot uint64 = 3
)
19 changes: 13 additions & 6 deletions operator/duties/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/operator/duties/dutystore"
"go.uber.org/zap"
)

type AttesterHandler struct {
Expand Down Expand Up @@ -175,8 +174,12 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)
if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) {
toExecute := make([]*genesisspectypes.Duty, 0, len(duties)*2)
for _, d := range duties {
if h.shouldExecute(d) {
if h.isFresh(d) {
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester))
// For every attestation duty we also have to try to perform aggregation duty even if it
// isn't necessarily needed - we won't know if it's needed or not until we rebuild
// validator signature (done during pre-consensus step) and perform some computation on
// it - hence scheduling it for execution here.
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator))
}
}
Expand All @@ -187,7 +190,11 @@ func (h *AttesterHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot)

toExecute := make([]*spectypes.ValidatorDuty, 0, len(duties))
for _, d := range duties {
if h.shouldExecute(d) {
if h.isFresh(d) {
// For every attestation duty we also have to try to perform aggregation duty even if it
// isn't necessarily needed - we won't know if it's needed or not until we rebuild
// validator signature (done during pre-consensus step) and perform some computation on
// it - hence scheduling it for execution here.
toExecute = append(toExecute, h.toSpecDuty(d, spectypes.BNRoleAggregator))
}
}
Expand Down Expand Up @@ -274,9 +281,9 @@ func (h *AttesterHandler) toGenesisSpecDuty(duty *eth2apiv1.AttesterDuty, role g
}
}

func (h *AttesterHandler) shouldExecute(duty *eth2apiv1.AttesterDuty) bool {
func (h *AttesterHandler) isFresh(duty *eth2apiv1.AttesterDuty) bool {
currentSlot := h.network.Beacon.EstimatedCurrentSlot()
// execute task if slot already began and not pass 1 epoch
// if a whole epoch of slots has passed since the slot duty is for there is no point executing it
var attestationPropagationSlotRange = phase0.Slot(h.network.Beacon.SlotsPerEpoch())
if currentSlot >= duty.Slot && currentSlot-duty.Slot <= attestationPropagationSlotRange {
return true
Expand Down
3 changes: 1 addition & 2 deletions operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/exporter/api"
qbftstorage "github.com/ssvlabs/ssv/ibft/storage"
Expand All @@ -22,6 +20,7 @@ import (
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
storage2 "github.com/ssvlabs/ssv/registry/storage"
"github.com/ssvlabs/ssv/storage/basedb"
"go.uber.org/zap"
)

// Node represents the behavior of SSV node
Expand Down
8 changes: 4 additions & 4 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/exporter/convert"
"github.com/ssvlabs/ssv/ibft/genesisstorage"
"github.com/ssvlabs/ssv/ibft/storage"
Expand Down Expand Up @@ -59,6 +57,7 @@ import (
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
"github.com/ssvlabs/ssv/storage/basedb"
"go.uber.org/zap"
)

//go:generate mockgen -package=mocks -destination=./mocks/controller.go -source=./controller.go
Expand Down Expand Up @@ -702,8 +701,9 @@ func (c *controller) GetValidator(pubKey spectypes.ValidatorPK) (*validators.Val
}

func (c *controller) ExecuteGenesisDuty(logger *zap.Logger, duty *genesisspectypes.Duty) {
// because we're using the same duty for more than 1 duty (e.g. attest + aggregator) there is an error in bls.Deserialize func for cgo pointer to pointer,
// so we need to copy the pubkey to avoid pointer.
// because we're using the same duty for more than 1 duty (e.g. attest + aggregator) there is
// an error in bls.Deserialize func for cgo pointer to pointer, so we need to copy the pubkey
// to avoid pointer.
var pk phase0.BLSPubKey
copy(pk[:], duty.PubKey[:])

Expand Down
79 changes: 71 additions & 8 deletions protocol/genesis/ssv/runner/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package runner

import (
"crypto/sha256"
"encoding/binary"
"encoding/json"
spectypes "github.com/ssvlabs/ssv-spec/types"
"fmt"
"hash"
"sync"

"github.com/attestantio/go-eth2-client/spec/phase0"
ssz "github.com/ferranbt/fastssz"
"github.com/pkg/errors"
genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft"
genesisspecssv "github.com/ssvlabs/ssv-spec-pre-cc/ssv"
genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/protocol/genesis/qbft/controller"
"github.com/ssvlabs/ssv/protocol/genesis/ssv/runner/metrics"
"go.uber.org/zap"
)

type AggregatorRunner struct {
Expand Down Expand Up @@ -96,17 +99,29 @@ func (r *AggregatorRunner) ProcessPreConsensus(logger *zap.Logger, signedMsg *ge
fields.Slot(duty.Slot),
)

r.metrics.PauseDutyFullFlow()
// this is the earliest in aggregator runner flow where we get to know whether we are meant
// to perform this aggregation duty or not
ok, err := isAggregator(duty.CommitteeLength, fullSig)
if err != nil {
return fmt.Errorf("check if validator is an aggregator: %w", err)
}
if !ok {
logger.Debug("aggregation duty won't be needed from this validator for this slot",
zap.Any("signer", signedMsg.Signer),
fields.Slot(duty.Slot),
)
return nil
}

r.metrics.PauseDutyFullFlow()
// get block data
res, ver, err := r.GetBeaconNode().SubmitAggregateSelectionProof(duty.Slot, duty.CommitteeIndex, duty.CommitteeLength, duty.ValidatorIndex, fullSig)
if err != nil {
return errors.Wrap(err, "failed to submit aggregate and proof")
}

r.metrics.ContinueDutyFullFlow()
r.metrics.StartConsensus()

r.metrics.StartConsensus()
byts, err := res.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "could not marshal aggregate and proof")
Expand All @@ -116,7 +131,6 @@ func (r *AggregatorRunner) ProcessPreConsensus(logger *zap.Logger, signedMsg *ge
Version: ver,
DataSSZ: byts,
}

if err := r.BaseRunner.decide(logger, r, input); err != nil {
return errors.Wrap(err, "can't start new duty runner instance for duty")
}
Expand Down Expand Up @@ -328,7 +342,6 @@ func (r *AggregatorRunner) Decode(data []byte) error {
return json.Unmarshal(data, &r)
}

// GetRoot returns the root used for signing and verification
// GetRoot returns the root used for signing and verification
func (r *AggregatorRunner) GetRoot() ([32]byte, error) {
marshaledRoot, err := r.Encode()
Expand All @@ -338,3 +351,53 @@ func (r *AggregatorRunner) GetRoot() ([32]byte, error) {
ret := sha256.Sum256(marshaledRoot)
return ret, nil
}

// isAggregator returns true if the signature is from the input validator. The committee
// count is provided as an argument rather than imported implementation from spec. Having
// committee count as an argument allows cheaper computation at run time.
//
// Spec pseudocode definition:
//
// def is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: BLSSignature) -> bool:
// committee = get_beacon_committee(state, slot, index)
// modulo = max(1, len(committee) // TARGET_AGGREGATORS_PER_COMMITTEE)
// return bytes_to_uint64(hash(slot_signature)[0:8]) % modulo == 0
func isAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
const targetAggregatorsPerCommittee uint64 = 16

modulo := committeeCount / targetAggregatorsPerCommittee
if modulo == 0 {
// Modulo must be at least 1.
modulo = 1
}

b := hashSha256(slotSig)
return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil
}

var sha256Pool = sync.Pool{New: func() interface{} {
return sha256.New()
}}

// hashSha256 defines a function that returns the sha256 checksum of the data passed in.
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/core/0_beacon-chain.md#hash
func hashSha256(data []byte) [32]byte {
h, ok := sha256Pool.Get().(hash.Hash)
if !ok {
h = sha256.New()
}
defer sha256Pool.Put(h)
h.Reset()

var b [32]byte

// The hash interface never returns an error, for that reason
// we are not handling the error below. For reference, it is
// stated here https://golang.org/pkg/hash/#Hash

// #nosec G104
h.Write(data)
h.Sum(b[:0])

return b
}
Loading

0 comments on commit 0fcb09f

Please sign in to comment.