Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Oct 29, 2024
1 parent 2f8ff38 commit 346507f
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 68 deletions.
4 changes: 2 additions & 2 deletions message/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ var (
ErrValidatorNotAttesting = Error{text: "validator is not attesting"}
ErrEarlySlotMessage = Error{text: "message was sent before slot starts"}
ErrLateSlotMessage = Error{text: "current time is above duty's start +34(committee and aggregator) or +3(else) slots"}
ErrSlotAlreadyAdvanced = Error{text: "signer has already advanced to a later slot"}
ErrRoundAlreadyAdvanced = Error{text: "signer has already advanced to a later round"}
ErrSlotAlreadyAdvanced = Error{text: "operator has already advanced to a later slot"}
ErrRoundAlreadyAdvanced = Error{text: "operator has already advanced to a later round"}
ErrDecidedWithSameSigners = Error{text: "decided with same number of signers"}
ErrPubSubDataTooBig = Error{text: "pub-sub message data too big"}
ErrIncorrectTopic = Error{text: "incorrect topic"}
Expand Down
1 change: 0 additions & 1 deletion protocol/v2/qbft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func NewController(

// StartNewInstance will start a new QBFT instance, if can't will return error
func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height, value []byte) error {

if err := c.GetConfig().GetValueCheckF()(value); err != nil {
return errors.Wrap(err, "value invalid")
}
Expand Down
52 changes: 30 additions & 22 deletions protocol/v2/qbft/instance/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ func (i *Instance) uponCommit(
msg *specqbft.ProcessingMessage,
commitMsgContainer *specqbft.MsgContainer,
) (bool, []byte, *spectypes.SignedSSVMessage, error) {
logger.Debug("📬 got commit message",
fields.Round(i.State.Round),
zap.Any("commit_signers", msg.SignedMessage.OperatorIDs),
fields.Root(msg.QBFTMessage.Root))
hasQuorumBefore := specqbft.HasQuorum(i.State.CommitteeMember, commitMsgContainer.MessagesForRound(i.State.Round))

addMsg, err := commitMsgContainer.AddFirstMsgForSignerAndRound(msg)
if err != nil {
Expand All @@ -33,34 +30,45 @@ func (i *Instance) uponCommit(
return false, nil, nil, nil // uponCommit was already called
}

// calculate commit quorum and act upon it
logger = logger.With(
fields.Height(i.State.Height),
fields.Round(i.State.Round),
zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)),
)

logger.Debug("📬 got commit message",
zap.Any("commit_signers", msg.SignedMessage.OperatorIDs),
fields.Root(msg.QBFTMessage.Root))

if hasQuorumBefore {
return false, nil, nil, nil // already moved to commit stage
}

quorum, commitMsgs, err := i.commitQuorumForRoundRoot(commitMsgContainer, msg.QBFTMessage.Root, msg.QBFTMessage.Round)
if err != nil {
return false, nil, nil, errors.Wrap(err, "could not calculate commit quorum")
}
if !quorum {
return false, nil, nil, nil
}

if quorum {
fullData := i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData /* must have value there, checked on validateCommit */

agg, err := aggregateCommitMsgs(commitMsgs, fullData)
if err != nil {
return false, nil, nil, errors.Wrap(err, "could not aggregate commit msgs")
}
fullData := i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData /* must have value there, checked on validateCommit */

logger.Debug("🎯 got commit quorum",
fields.Round(i.State.Round),
zap.Any("agg_signers", agg.OperatorIDs),
fields.Root(msg.QBFTMessage.Root))
agg, err := aggregateCommitMsgs(commitMsgs, fullData)
if err != nil {
return false, nil, nil, errors.Wrap(err, "could not aggregate commit msgs")
}

i.metrics.EndStageCommit()
logger.Debug("🎯 got commit quorum",
zap.Any("agg_signers", agg.OperatorIDs),
fields.Root(msg.QBFTMessage.Root))

i.State.Decided = true
i.State.DecidedValue = fullData
i.metrics.EndStageCommit()

return true, fullData, agg, nil
}
i.State.Decided = true
i.State.DecidedValue = fullData

return false, nil, nil, nil
return true, fullData, agg, nil
}

// returns true if there is a quorum for the current round for this provided value
Expand Down
2 changes: 1 addition & 1 deletion protocol/v2/qbft/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (i *Instance) ProcessMsg(
decided, decidedValue, aggregatedCommit, err = i.uponCommit(logger, msg, i.State.CommitContainer)
return err
case specqbft.RoundChangeMsgType:
return i.uponRoundChange(logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF())
return i.uponRoundChange(logger, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF())
default:
return errors.New("signed message type not supported")
}
Expand Down
16 changes: 9 additions & 7 deletions protocol/v2/qbft/instance/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa
return nil // uponPrepare was already called
}

proposedRoot := i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root
logger.Debug("📬 got prepare message",
logger = logger.With(
fields.Height(i.State.Height),
fields.Round(i.State.Round),
zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)),
)

logger.Debug("📬 got prepare message",
zap.Any("prepare_signers", msg.SignedMessage.OperatorIDs),
fields.Root(proposedRoot))
fields.Root(msg.QBFTMessage.Root))

if hasQuorumBefore {
return nil // already moved to commit stage
Expand All @@ -45,18 +49,16 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa
i.metrics.EndStagePrepare()

logger.Debug("🎯 got prepare quorum",
fields.Round(i.State.Round),
zap.Any("prepare_signers", allSigners(prepareMsgContainer.MessagesForRound(i.State.Round))))

commitMsg, err := i.CreateCommit(i.signer, proposedRoot)
commitMsg, err := i.CreateCommit(i.signer, i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root)
if err != nil {
return errors.Wrap(err, "could not create commit msg")
}

logger.Debug("📢 broadcasting commit message",
fields.Round(i.State.Round),
zap.Any("commit_signers", commitMsg.OperatorIDs),
fields.Root(proposedRoot))
fields.Root(i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root))

if err := i.Broadcast(logger, commitMsg); err != nil {
return errors.Wrap(err, "failed to broadcast commit message")
Expand Down
14 changes: 9 additions & 5 deletions protocol/v2/qbft/instance/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess
return nil // uponProposal was already called
}

logger.Debug("📬 got proposal message",
logger = logger.With(
fields.Height(i.State.Height),
fields.Round(i.State.Round),
zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)),
)

logger.Debug("📬 got proposal message",
zap.Any("proposal_signers", msg.SignedMessage.OperatorIDs))

newRound := msg.QBFTMessage.Round
i.State.ProposalAcceptedForCurrentRound = msg

// A future justified proposal should bump us into future round and reset timer
if msg.QBFTMessage.Round > i.State.Round {
i.config.GetTimer().TimeoutForRound(msg.QBFTMessage.Height, msg.QBFTMessage.Round)
}
i.bumpToRound(newRound)
i.bumpToRound(msg.QBFTMessage.Round)

i.metrics.EndStageProposal()

Expand All @@ -45,19 +49,19 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess
return errors.Wrap(err, "could not hash input data")
}

prepare, err := i.CreatePrepare(i.signer, newRound, r)
prepare, err := i.CreatePrepare(i.signer, msg.QBFTMessage.Round, r)
if err != nil {
return errors.Wrap(err, "could not create prepare msg")
}

logger.Debug("📢 got proposal, broadcasting prepare message",
fields.Round(i.State.Round),
zap.Any("proposal_signers", msg.SignedMessage.OperatorIDs),
zap.Any("prepare_signers", prepare.OperatorIDs))

if err := i.Broadcast(logger, prepare); err != nil {
return errors.Wrap(err, "failed to broadcast prepare message")
}

return nil
}

Expand Down
25 changes: 10 additions & 15 deletions protocol/v2/qbft/instance/round_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
// Assumes round change message is valid!
func (i *Instance) uponRoundChange(
logger *zap.Logger,
instanceStartValue []byte,
msg *specqbft.ProcessingMessage,
roundChangeMsgContainer *specqbft.MsgContainer,
valCheck specqbft.ProposedValueCheckF,
Expand All @@ -38,8 +37,8 @@ func (i *Instance) uponRoundChange(
}

logger = logger.With(
fields.Round(i.State.Round),
fields.Height(i.State.Height),
fields.Round(i.State.Round),
zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)),
)

Expand All @@ -53,7 +52,7 @@ func (i *Instance) uponRoundChange(
if partialQuorum && newRound > i.State.Round {
// this code executes exactly once per round - when partial round quorum has
// been achieved (because Instance events are processed sequentially)
err = i.uponPartialQuorumAboveOwnRound(logger, newRound, instanceStartValue)
err = i.uponPartialQuorumAboveOwnRound(logger, newRound)
if err != nil {
return fmt.Errorf("could not process round change with partial quorum above own round: %w", err)
}
Expand Down Expand Up @@ -81,7 +80,6 @@ func (i *Instance) uponRoundChange(

justifiedRoundChangeMsg, valueToPropose, err := i.buildJustifiedProposalToLeadRound(
i.config,
instanceStartValue,
roundChangesWrapped,
msg.QBFTMessage.Round,
valCheck,
Expand Down Expand Up @@ -124,10 +122,11 @@ func (i *Instance) uponRoundChange(
return fmt.Errorf("couldn't calculate data root hash: %w", err)
}

logger.Debug("🔄 got justified round change, broadcasting proposal message",
fields.Round(i.State.Round),
logger.Debug(
"🔄 got justified round change, broadcasting proposal message",
zap.Any("round_change_signers", allSigners(roundChangeMsgContainer.MessagesForRound(i.State.Round))),
fields.Root(r))
fields.Root(r),
)

if err := i.Broadcast(logger, proposal); err != nil {
return errors.Wrap(err, "failed to broadcast proposal message")
Expand Down Expand Up @@ -158,27 +157,24 @@ func (i *Instance) gotPartialQuorumAboveOwnRound(roundChangeMsgContainer *specqb
func (i *Instance) uponPartialQuorumAboveOwnRound(
logger *zap.Logger,
newRound specqbft.Round,
instanceStartValue []byte,
) error {
i.bumpToRound(newRound)
i.State.ProposalAcceptedForCurrentRound = nil

i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round)

roundChange, err := i.CreateRoundChange(i.signer, newRound, instanceStartValue)
roundChange, err := i.CreateRoundChange(i.signer, newRound)
if err != nil {
return errors.Wrap(err, "failed to create round change message")
}

root, err := specqbft.HashDataRoot(instanceStartValue)
root, err := specqbft.HashDataRoot(i.StartValue)
if err != nil {
return errors.Wrap(err, "failed to hash instance start value")
}
logger.Debug("📢 got partial quorum, broadcasting round change message",
fields.Round(i.State.Round),
fields.Root(root),
zap.Any("round_change_signers", roundChange.OperatorIDs),
fields.Height(i.State.Height),
zap.String("reason", "partial-quorum"))

if err := i.Broadcast(logger, roundChange); err != nil {
Expand All @@ -199,7 +195,6 @@ func (i *Instance) uponPartialQuorumAboveOwnRound(
// specified round can't be built.
func (i *Instance) buildJustifiedProposalToLeadRound(
config qbft.IConfig,
instanceStartValue []byte,
roundChangesWrapped []*specqbft.ProcessingMessage,
round specqbft.Round,
valCheck specqbft.ProposedValueCheckF,
Expand All @@ -211,7 +206,7 @@ func (i *Instance) buildJustifiedProposalToLeadRound(
// chose proposal value:
// - if justifiedRoundChangeMsg has no prepare justification chose Instance start value
// - if justifiedRoundChangeMsg has prepare justification chose prepared value
valueToPropose := instanceStartValue
valueToPropose := i.StartValue
if roundChangeMsgWrapped.QBFTMessage.RoundChangePrepared() {
valueToPropose = roundChangeMsgWrapped.SignedMessage.FullData
}
Expand Down Expand Up @@ -483,7 +478,7 @@ RoundChange(
getRoundChangeJustification(current)
)
*/
func (i *Instance) CreateRoundChange(signer ssvtypes.OperatorSigner, newRound specqbft.Round, instanceStartValue []byte) (*spectypes.SignedSSVMessage, error) {
func (i *Instance) CreateRoundChange(signer ssvtypes.OperatorSigner, newRound specqbft.Round) (*spectypes.SignedSSVMessage, error) {
lastPreparedRound, root, lastPreparedValue, justifications, err := i.getRoundChangeData()
if err != nil {
return nil, errors.Wrap(err, "could not generate round change data")
Expand Down
28 changes: 14 additions & 14 deletions protocol/v2/qbft/instance/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@ func (i *Instance) UponRoundTimeout(logger *zap.Logger) error {
}

newRound := i.State.Round + 1
logger.Debug("⌛ round timed out", fields.Round(newRound))

// TODO: previously this was done outside of a defer, which caused the
// round to be bumped before the round change message was created & broadcasted.
// Remember to track the impact of this change and revert/modify if necessary.
defer func() {
i.bumpToRound(newRound)
i.State.ProposalAcceptedForCurrentRound = nil
i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round)
}()

roundChange, err := i.CreateRoundChange(i.signer, newRound, i.StartValue)

logger = logger.With(
fields.Height(i.State.Height),
fields.Round(i.State.Round),
zap.Uint64("new_round", uint64(newRound)),
)

logger.Debug("⌛ round timed out, readying next round")

roundChange, err := i.CreateRoundChange(i.signer, newRound)
if err != nil {
return errors.Wrap(err, "could not generate round change msg")
}
Expand All @@ -34,15 +32,17 @@ func (i *Instance) UponRoundTimeout(logger *zap.Logger) error {
return err
}
logger.Debug("📢 broadcasting round change message",
fields.Round(i.State.Round),
fields.Root(root),
zap.Any("round_change_signers", roundChange.OperatorIDs),
fields.Height(i.State.Height),
zap.String("reason", "timeout"))

if err := i.Broadcast(logger, roundChange); err != nil {
return errors.Wrap(err, "failed to broadcast round change message")
}

i.bumpToRound(newRound)
i.State.ProposalAcceptedForCurrentRound = nil
i.config.GetTimer().TimeoutForRound(i.State.Height, newRound)

return nil
}
2 changes: 1 addition & 1 deletion protocol/v2/ssv/value_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func BeaconVoteValueCheckF(
attestationData := &phase0.AttestationData{
Slot: slot,
// Consensus data is unaware of CommitteeIndex
// We use -1 to not run into issues with the duplicate value slashing check:
// We use math.MaxUint64 to not run into issues with the duplicate value slashing check:
// (data_1 != data_2 and data_1.target.epoch == data_2.target.epoch)
Index: math.MaxUint64,
BeaconBlockRoot: bv.BlockRoot,
Expand Down

0 comments on commit 346507f

Please sign in to comment.