-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aggregation-duty: clarify and clean up #1817
base: stage
Are you sure you want to change the base?
Conversation
// differ from spec because we need to subscribe to subnet | ||
isAggregator, err := isAggregator(committeeLength, slotSig) | ||
if err != nil { | ||
return nil, DataVersionNil, fmt.Errorf("failed to check if validator is an aggregator: %w", err) | ||
} | ||
if !isAggregator { | ||
return nil, DataVersionNil, fmt.Errorf("validator is not an aggregator") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this here seems to be causing this error consistently showing in logs:
{
"level": "\u001b[35mDEBUG\u001b[0m",
"time": "2024-10-25T13:17:08.000420Z",
"name": "Controller.Validator",
"msg": "❗ could not handle message",
"pubkey": "9203935ee396795ab6775b910f94d8d38850cd2331fc4416fdfeb87cc6d4cd764a9f6b7dcddd05b6a9c76a73cd68f100",
"beacon_role": "AGGREGATOR",
"signer": 419,
"slot": 2829985,
"msg_type": "partial_signature",
"error": "failed to submit aggregate and proof: validator is not an aggregator",
"errorVerbose": "validator is not an aggregator\nfailed to submit aggregate and proof\ngithub.com/ssvlabs/ssv/protocol/genesis/ssv/runner.(*AggregatorRunner).ProcessPreConsensus\n\t/go/src/github.com/ssvlabs/ssv/protocol/genesis/ssv/runner/aggregator.go:104\ngithub.com/ssvlabs/ssv/protocol/genesis/ssv/validator.(*Validator).ProcessMessage\n\t/go/src/github.com/ssvlabs/ssv/protocol/genesis/ssv/validator/validator.go:151\ngithub.com/ssvlabs/ssv/protocol/genesis/ssv/validator.(*Validator).ConsumeQueue\n\t/go/src/github.com/ssvlabs/ssv/protocol/genesis/ssv/validator/msgqueue_consumer.go:154\ngithub.com/ssvlabs/ssv/protocol/genesis/ssv/validator.(*Validator).StartQueueConsumer\n\t/go/src/github.com/ssvlabs/ssv/protocol/genesis/ssv/validator/msgqueue_consumer.go:60\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1695"
}
so,
- moving this code out of
goclient
because it doesn't really belong here - and handling
isAggregator==true
a bit differently (justlog.Debug
instead of erorring)
if h.isFresh(d) { | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAttester)) | ||
// For every attestation duty we also have to try to perform aggregation duty even if it | ||
// isn't necessarily needed - we won't know if it's needed or not until we rebuild | ||
// validator signature (done during pre-consensus step) and perform some computation on | ||
// it - hence scheduling it for execution here. | ||
toExecute = append(toExecute, h.toGenesisSpecDuty(d, genesisspectypes.BNRoleAggregator)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I get this right, or there is more to it ?
// CurrentDuty is the duty the node pulled locally from the beacon node, might be different from decided duty | ||
// StartingDuty is the duty the node pulled locally from the beacon node, might be different | ||
// from the actual duty operators have decided upon. | ||
StartingDuty spectypes.Duty `json:"StartingDuty,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be different from decided duty
Is the only reason it can be different - because other operators might have gotten different data from their beacon nodes (and hence decided value for operator cluster is different), or there is something else ?
r.metrics.PauseDutyFullFlow() | ||
// get block data | ||
duty = r.GetState().StartingDuty.(*spectypes.ValidatorDuty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is essentially duplicated, looks like a typo (see couple of lines above it).
r.metrics.StartConsensus() | ||
if err := r.BaseRunner.decide(logger, r, duty.Slot, input); err != nil { | ||
return errors.Wrap(err, "can't start new duty runner instance for duty") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems r.metrics.StartConsensus()
call have been missing here ? I see it made in other runners, doesn't look like this is different.
// signer must be same for all messages, at least 1 message must be present (this is validated | ||
// prior to ProcessPreConsensus call) | ||
signer := signedMsg.Messages[0].Signer | ||
duty := r.GetState().StartingDuty.(*spectypes.ValidatorDuty) | ||
|
||
logger.Debug("🧩 got partial signature quorum", | ||
zap.Any("signer", signedMsg.Messages[0].Signer), // TODO: always 1? | ||
zap.Any("signer", signer), | ||
fields.Slot(duty.Slot), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answering TODO: looks like signer should be same for all messages (this is validated earlier with PartialSignatureMessages.Validate
).
// isFutureMessage returns true if message height is from a future instance. | ||
// It takes into consideration a special case where FirstHeight didn't start but c.Height == FirstHeight (since we bump height on start instance) | ||
func (c *Controller) isFutureMessage(msg *specqbft.ProcessingMessage) (bool, error) { | ||
// validateMsgHeight returns error if provided message height is from a future instance. | ||
// It takes into consideration a special case where FirstHeight instance didn't start yet | ||
// but c.Height == FirstHeight. | ||
func (c *Controller) validateMsgHeight(msg *specqbft.ProcessingMessage) error { | ||
if c.Height == specqbft.FirstHeight && c.StoredInstances.FindInstance(c.Height) == nil { | ||
return true, nil | ||
return fmt.Errorf("qbft instance at height: %d hasn't started yet", specqbft.FirstHeight) | ||
} | ||
|
||
return msg.QBFTMessage.Height > c.Height, nil | ||
if msg.QBFTMessage.Height > c.Height { | ||
return fmt.Errorf("message height: %d > current qbft height: %d", msg.QBFTMessage.Height, c.Height) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just rewriting this func isFutureMessage
->validateMsgHeight
so we can log a detailed error message on what exactly happened (expected vs actual, to understand how far in the future the message actually is).
Was browsing through aggregator duty flow to better understand how it works, and observing strange error (see discussions below) in logs on stage decided it's worth try and document it some, and maybe fix the error.
Before merging:
Putting this PR in draft for now (it is essentially ready, just want other functionality to get merged first so I can rebase and fix pipelines).