diff --git a/message/validation/errors.go b/message/validation/errors.go index 6946f916da..596d77aff3 100644 --- a/message/validation/errors.go +++ b/message/validation/errors.go @@ -73,8 +73,8 @@ var ( ErrValidatorNotAttesting = Error{text: "validator is not attesting"} ErrEarlySlotMessage = Error{text: "message was sent before slot starts"} ErrLateSlotMessage = Error{text: "current time is above duty's start +34(committee and aggregator) or +3(else) slots"} - ErrSlotAlreadyAdvanced = Error{text: "signer has already advanced to a later slot"} - ErrRoundAlreadyAdvanced = Error{text: "signer has already advanced to a later round"} + ErrSlotAlreadyAdvanced = Error{text: "operator has already advanced to a later slot"} + ErrRoundAlreadyAdvanced = Error{text: "operator has already advanced to a later round"} ErrDecidedWithSameSigners = Error{text: "decided with same number of signers"} ErrPubSubDataTooBig = Error{text: "pub-sub message data too big"} ErrIncorrectTopic = Error{text: "incorrect topic"} diff --git a/protocol/v2/qbft/controller/controller.go b/protocol/v2/qbft/controller/controller.go index 729552b912..69a5b9ae69 100644 --- a/protocol/v2/qbft/controller/controller.go +++ b/protocol/v2/qbft/controller/controller.go @@ -56,7 +56,6 @@ func NewController( // StartNewInstance will start a new QBFT instance, if can't will return error func (c *Controller) StartNewInstance(logger *zap.Logger, height specqbft.Height, value []byte) error { - if err := c.GetConfig().GetValueCheckF()(value); err != nil { return errors.Wrap(err, "value invalid") } diff --git a/protocol/v2/qbft/instance/commit.go b/protocol/v2/qbft/instance/commit.go index 5aae9c02be..789ee25c29 100644 --- a/protocol/v2/qbft/instance/commit.go +++ b/protocol/v2/qbft/instance/commit.go @@ -20,10 +20,7 @@ func (i *Instance) uponCommit( msg *specqbft.ProcessingMessage, commitMsgContainer *specqbft.MsgContainer, ) (bool, []byte, *spectypes.SignedSSVMessage, error) { - logger.Debug("📬 got commit message", - fields.Round(i.State.Round), - zap.Any("commit_signers", msg.SignedMessage.OperatorIDs), - fields.Root(msg.QBFTMessage.Root)) + hasQuorumBefore := specqbft.HasQuorum(i.State.CommitteeMember, commitMsgContainer.MessagesForRound(i.State.Round)) addMsg, err := commitMsgContainer.AddFirstMsgForSignerAndRound(msg) if err != nil { @@ -33,34 +30,45 @@ func (i *Instance) uponCommit( return false, nil, nil, nil // uponCommit was already called } - // calculate commit quorum and act upon it + logger = logger.With( + fields.Height(i.State.Height), + fields.Round(i.State.Round), + zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)), + ) + + logger.Debug("📬 got commit message", + zap.Any("commit_signers", msg.SignedMessage.OperatorIDs), + fields.Root(msg.QBFTMessage.Root)) + + if hasQuorumBefore { + return false, nil, nil, nil // already moved to commit stage + } + quorum, commitMsgs, err := i.commitQuorumForRoundRoot(commitMsgContainer, msg.QBFTMessage.Root, msg.QBFTMessage.Round) if err != nil { return false, nil, nil, errors.Wrap(err, "could not calculate commit quorum") } + if !quorum { + return false, nil, nil, nil + } - if quorum { - fullData := i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData /* must have value there, checked on validateCommit */ - - agg, err := aggregateCommitMsgs(commitMsgs, fullData) - if err != nil { - return false, nil, nil, errors.Wrap(err, "could not aggregate commit msgs") - } + fullData := i.State.ProposalAcceptedForCurrentRound.SignedMessage.FullData /* must have value there, checked on validateCommit */ - logger.Debug("🎯 got commit quorum", - fields.Round(i.State.Round), - zap.Any("agg_signers", agg.OperatorIDs), - fields.Root(msg.QBFTMessage.Root)) + agg, err := aggregateCommitMsgs(commitMsgs, fullData) + if err != nil { + return false, nil, nil, errors.Wrap(err, "could not aggregate commit msgs") + } - i.metrics.EndStageCommit() + logger.Debug("🎯 got commit quorum", + zap.Any("agg_signers", agg.OperatorIDs), + fields.Root(msg.QBFTMessage.Root)) - i.State.Decided = true - i.State.DecidedValue = fullData + i.metrics.EndStageCommit() - return true, fullData, agg, nil - } + i.State.Decided = true + i.State.DecidedValue = fullData - return false, nil, nil, nil + return true, fullData, agg, nil } // returns true if there is a quorum for the current round for this provided value diff --git a/protocol/v2/qbft/instance/instance.go b/protocol/v2/qbft/instance/instance.go index cf00fa790d..59afb01c0e 100644 --- a/protocol/v2/qbft/instance/instance.go +++ b/protocol/v2/qbft/instance/instance.go @@ -147,7 +147,7 @@ func (i *Instance) ProcessMsg( decided, decidedValue, aggregatedCommit, err = i.uponCommit(logger, msg, i.State.CommitContainer) return err case specqbft.RoundChangeMsgType: - return i.uponRoundChange(logger, i.StartValue, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF()) + return i.uponRoundChange(logger, msg, i.State.RoundChangeContainer, i.config.GetValueCheckF()) default: return errors.New("signed message type not supported") } diff --git a/protocol/v2/qbft/instance/prepare.go b/protocol/v2/qbft/instance/prepare.go index 226f451034..e9f7a27e5a 100644 --- a/protocol/v2/qbft/instance/prepare.go +++ b/protocol/v2/qbft/instance/prepare.go @@ -25,11 +25,15 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa return nil // uponPrepare was already called } - proposedRoot := i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root - logger.Debug("📬 got prepare message", + logger = logger.With( + fields.Height(i.State.Height), fields.Round(i.State.Round), + zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)), + ) + + logger.Debug("📬 got prepare message", zap.Any("prepare_signers", msg.SignedMessage.OperatorIDs), - fields.Root(proposedRoot)) + fields.Root(msg.QBFTMessage.Root)) if hasQuorumBefore { return nil // already moved to commit stage @@ -45,18 +49,16 @@ func (i *Instance) uponPrepare(logger *zap.Logger, msg *specqbft.ProcessingMessa i.metrics.EndStagePrepare() logger.Debug("🎯 got prepare quorum", - fields.Round(i.State.Round), zap.Any("prepare_signers", allSigners(prepareMsgContainer.MessagesForRound(i.State.Round)))) - commitMsg, err := i.CreateCommit(i.signer, proposedRoot) + commitMsg, err := i.CreateCommit(i.signer, i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root) if err != nil { return errors.Wrap(err, "could not create commit msg") } logger.Debug("📢 broadcasting commit message", - fields.Round(i.State.Round), zap.Any("commit_signers", commitMsg.OperatorIDs), - fields.Root(proposedRoot)) + fields.Root(i.State.ProposalAcceptedForCurrentRound.QBFTMessage.Root)) if err := i.Broadcast(logger, commitMsg); err != nil { return errors.Wrap(err, "failed to broadcast commit message") diff --git a/protocol/v2/qbft/instance/proposal.go b/protocol/v2/qbft/instance/proposal.go index 01aaa74941..ab2564e59a 100644 --- a/protocol/v2/qbft/instance/proposal.go +++ b/protocol/v2/qbft/instance/proposal.go @@ -24,18 +24,22 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess return nil // uponProposal was already called } - logger.Debug("📬 got proposal message", + logger = logger.With( + fields.Height(i.State.Height), fields.Round(i.State.Round), + zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)), + ) + + logger.Debug("📬 got proposal message", zap.Any("proposal_signers", msg.SignedMessage.OperatorIDs)) - newRound := msg.QBFTMessage.Round i.State.ProposalAcceptedForCurrentRound = msg // A future justified proposal should bump us into future round and reset timer if msg.QBFTMessage.Round > i.State.Round { i.config.GetTimer().TimeoutForRound(msg.QBFTMessage.Height, msg.QBFTMessage.Round) } - i.bumpToRound(newRound) + i.bumpToRound(msg.QBFTMessage.Round) i.metrics.EndStageProposal() @@ -45,19 +49,19 @@ func (i *Instance) uponProposal(logger *zap.Logger, msg *specqbft.ProcessingMess return errors.Wrap(err, "could not hash input data") } - prepare, err := i.CreatePrepare(i.signer, newRound, r) + prepare, err := i.CreatePrepare(i.signer, msg.QBFTMessage.Round, r) if err != nil { return errors.Wrap(err, "could not create prepare msg") } logger.Debug("📢 got proposal, broadcasting prepare message", - fields.Round(i.State.Round), zap.Any("proposal_signers", msg.SignedMessage.OperatorIDs), zap.Any("prepare_signers", prepare.OperatorIDs)) if err := i.Broadcast(logger, prepare); err != nil { return errors.Wrap(err, "failed to broadcast prepare message") } + return nil } diff --git a/protocol/v2/qbft/instance/round_change.go b/protocol/v2/qbft/instance/round_change.go index 748632038c..728432f2a0 100644 --- a/protocol/v2/qbft/instance/round_change.go +++ b/protocol/v2/qbft/instance/round_change.go @@ -18,7 +18,6 @@ import ( // Assumes round change message is valid! func (i *Instance) uponRoundChange( logger *zap.Logger, - instanceStartValue []byte, msg *specqbft.ProcessingMessage, roundChangeMsgContainer *specqbft.MsgContainer, valCheck specqbft.ProposedValueCheckF, @@ -38,8 +37,8 @@ func (i *Instance) uponRoundChange( } logger = logger.With( - fields.Round(i.State.Round), fields.Height(i.State.Height), + fields.Round(i.State.Round), zap.Uint64("msg_round", uint64(msg.QBFTMessage.Round)), ) @@ -53,7 +52,7 @@ func (i *Instance) uponRoundChange( if partialQuorum && newRound > i.State.Round { // this code executes exactly once per round - when partial round quorum has // been achieved (because Instance events are processed sequentially) - err = i.uponPartialQuorumAboveOwnRound(logger, newRound, instanceStartValue) + err = i.uponPartialQuorumAboveOwnRound(logger, newRound) if err != nil { return fmt.Errorf("could not process round change with partial quorum above own round: %w", err) } @@ -81,7 +80,6 @@ func (i *Instance) uponRoundChange( justifiedRoundChangeMsg, valueToPropose, err := i.buildJustifiedProposalToLeadRound( i.config, - instanceStartValue, roundChangesWrapped, msg.QBFTMessage.Round, valCheck, @@ -124,10 +122,11 @@ func (i *Instance) uponRoundChange( return fmt.Errorf("couldn't calculate data root hash: %w", err) } - logger.Debug("🔄 got justified round change, broadcasting proposal message", - fields.Round(i.State.Round), + logger.Debug( + "🔄 got justified round change, broadcasting proposal message", zap.Any("round_change_signers", allSigners(roundChangeMsgContainer.MessagesForRound(i.State.Round))), - fields.Root(r)) + fields.Root(r), + ) if err := i.Broadcast(logger, proposal); err != nil { return errors.Wrap(err, "failed to broadcast proposal message") @@ -158,27 +157,24 @@ func (i *Instance) gotPartialQuorumAboveOwnRound(roundChangeMsgContainer *specqb func (i *Instance) uponPartialQuorumAboveOwnRound( logger *zap.Logger, newRound specqbft.Round, - instanceStartValue []byte, ) error { i.bumpToRound(newRound) i.State.ProposalAcceptedForCurrentRound = nil i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round) - roundChange, err := i.CreateRoundChange(i.signer, newRound, instanceStartValue) + roundChange, err := i.CreateRoundChange(i.signer, newRound) if err != nil { return errors.Wrap(err, "failed to create round change message") } - root, err := specqbft.HashDataRoot(instanceStartValue) + root, err := specqbft.HashDataRoot(i.StartValue) if err != nil { return errors.Wrap(err, "failed to hash instance start value") } logger.Debug("📢 got partial quorum, broadcasting round change message", - fields.Round(i.State.Round), fields.Root(root), zap.Any("round_change_signers", roundChange.OperatorIDs), - fields.Height(i.State.Height), zap.String("reason", "partial-quorum")) if err := i.Broadcast(logger, roundChange); err != nil { @@ -199,7 +195,6 @@ func (i *Instance) uponPartialQuorumAboveOwnRound( // specified round can't be built. func (i *Instance) buildJustifiedProposalToLeadRound( config qbft.IConfig, - instanceStartValue []byte, roundChangesWrapped []*specqbft.ProcessingMessage, round specqbft.Round, valCheck specqbft.ProposedValueCheckF, @@ -211,7 +206,7 @@ func (i *Instance) buildJustifiedProposalToLeadRound( // chose proposal value: // - if justifiedRoundChangeMsg has no prepare justification chose Instance start value // - if justifiedRoundChangeMsg has prepare justification chose prepared value - valueToPropose := instanceStartValue + valueToPropose := i.StartValue if roundChangeMsgWrapped.QBFTMessage.RoundChangePrepared() { valueToPropose = roundChangeMsgWrapped.SignedMessage.FullData } @@ -483,7 +478,7 @@ RoundChange( getRoundChangeJustification(current) ) */ -func (i *Instance) CreateRoundChange(signer ssvtypes.OperatorSigner, newRound specqbft.Round, instanceStartValue []byte) (*spectypes.SignedSSVMessage, error) { +func (i *Instance) CreateRoundChange(signer ssvtypes.OperatorSigner, newRound specqbft.Round) (*spectypes.SignedSSVMessage, error) { lastPreparedRound, root, lastPreparedValue, justifications, err := i.getRoundChangeData() if err != nil { return nil, errors.Wrap(err, "could not generate round change data") diff --git a/protocol/v2/qbft/instance/timeout.go b/protocol/v2/qbft/instance/timeout.go index ebb5685b97..1760260d77 100644 --- a/protocol/v2/qbft/instance/timeout.go +++ b/protocol/v2/qbft/instance/timeout.go @@ -13,18 +13,16 @@ func (i *Instance) UponRoundTimeout(logger *zap.Logger) error { } newRound := i.State.Round + 1 - logger.Debug("⌛ round timed out", fields.Round(newRound)) - - // TODO: previously this was done outside of a defer, which caused the - // round to be bumped before the round change message was created & broadcasted. - // Remember to track the impact of this change and revert/modify if necessary. - defer func() { - i.bumpToRound(newRound) - i.State.ProposalAcceptedForCurrentRound = nil - i.config.GetTimer().TimeoutForRound(i.State.Height, i.State.Round) - }() - - roundChange, err := i.CreateRoundChange(i.signer, newRound, i.StartValue) + + logger = logger.With( + fields.Height(i.State.Height), + fields.Round(i.State.Round), + zap.Uint64("new_round", uint64(newRound)), + ) + + logger.Debug("⌛ round timed out, readying next round") + + roundChange, err := i.CreateRoundChange(i.signer, newRound) if err != nil { return errors.Wrap(err, "could not generate round change msg") } @@ -34,15 +32,17 @@ func (i *Instance) UponRoundTimeout(logger *zap.Logger) error { return err } logger.Debug("📢 broadcasting round change message", - fields.Round(i.State.Round), fields.Root(root), zap.Any("round_change_signers", roundChange.OperatorIDs), - fields.Height(i.State.Height), zap.String("reason", "timeout")) if err := i.Broadcast(logger, roundChange); err != nil { return errors.Wrap(err, "failed to broadcast round change message") } + i.bumpToRound(newRound) + i.State.ProposalAcceptedForCurrentRound = nil + i.config.GetTimer().TimeoutForRound(i.State.Height, newRound) + return nil } diff --git a/protocol/v2/ssv/value_check.go b/protocol/v2/ssv/value_check.go index 1e5dd209f7..6dc19005e9 100644 --- a/protocol/v2/ssv/value_check.go +++ b/protocol/v2/ssv/value_check.go @@ -60,7 +60,7 @@ func BeaconVoteValueCheckF( attestationData := &phase0.AttestationData{ Slot: slot, // Consensus data is unaware of CommitteeIndex - // We use -1 to not run into issues with the duplicate value slashing check: + // We use math.MaxUint64 to not run into issues with the duplicate value slashing check: // (data_1 != data_2 and data_1.target.epoch == data_2.target.epoch) Index: math.MaxUint64, BeaconBlockRoot: bv.BlockRoot,