Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: hare preround proposal compaction #6129

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func DefaultConfig() Config {
P2P: p2p.DefaultConfig(),
API: grpcserver.DefaultConfig(),
HARE3: hare3.DefaultConfig(),
HARE4: hare4.DefaultConfig(),
HARE4: hare4.DefaultConfig(), // DEFAULT HARE4 IS DISABLED
HareEligibility: eligibility.DefaultConfig(),
Beacon: beacon.DefaultConfig(),
TIME: timeConfig.DefaultConfig(),
Expand Down
9 changes: 2 additions & 7 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ func MainnetConfig() Config {
}
logging := DefaultLoggingConfig()
logging.TrtlLoggerLevel = zapcore.WarnLevel.String()
logging.AtxHandlerLevel = zapcore.WarnLevel.String()
logging.ProposalListenerLevel = zapcore.WarnLevel.String()
forkLayer := types.LayerID(111_111_111) // TODO THIS NEEDS A NUMBER
logging.MeshLoggerLevel = zapcore.WarnLevel.String()
hare3conf := hare3.DefaultConfig()
hare3conf.Committee = 400
hare3conf.Enable = true
Expand All @@ -71,12 +69,9 @@ func MainnetConfig() Config {
Layer: 105_720, // July 15, 2024, 10:00:00 AM UTC
Size: 50,
}
hare3conf.DisableLayer = forkLayer

hare4conf := hare4.DefaultConfig()
hare4conf.Committee = 50
hare4conf.Enable = true
hare4conf.EnableLayer = forkLayer
hare4conf.Enable = false
return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down
11 changes: 1 addition & 10 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,13 @@ func fastnet() config.Config {
conf.ATXGradeDelay = 1 * time.Second

conf.HARE3.Enable = true
conf.HARE3.DisableLayer = 22
conf.HARE3.DisableLayer = types.LayerID(math.MaxUint32)
conf.HARE3.Committee = 800
conf.HARE3.Leaders = 10
conf.HARE3.PreroundDelay = 3 * time.Second
conf.HARE3.RoundDuration = 700 * time.Millisecond
conf.HARE3.IterationsLimit = 2

conf.HARE4.Enable = true
conf.HARE4.EnableLayer = types.LayerID(22)
conf.HARE4.DisableLayer = types.LayerID(math.MaxUint32)
conf.HARE4.Committee = 800
conf.HARE4.Leaders = 10
conf.HARE4.PreroundDelay = 3 * time.Second
conf.HARE4.RoundDuration = 700 * time.Millisecond
conf.HARE4.IterationsLimit = 2

conf.P2P.MinPeers = 10

conf.Genesis = config.GenesisConfig{
Expand Down
9 changes: 3 additions & 6 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,13 @@ func testnet() config.Config {
}
hare3conf := hare3.DefaultConfig()
hare3conf.Enable = true
hare3conf.EnableLayer = 0
hare3conf.DisableLayer = 50
hare3conf.EnableLayer = 7366
// NOTE(dshulyak) i forgot to set protocol name for testnet when we configured it manually.
// we can't do rolling upgrade if protocol name changes, so lets keep it like that temporarily.
hare3conf.ProtocolName = ""
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = true
hare4conf.EnableLayer = 50 // TODO THIS NEEDS A VALUE
hare4conf.DisableLayer = math.MaxUint32

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")
return config.Config{
Preset: "testnet",
Expand Down
100 changes: 47 additions & 53 deletions hare4/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,48 @@
errResponseTooBig = errors.New("response too big")
errCannotFindProposal = errors.New("cannot find proposal")
errNoEligibilityProofs = errors.New("no eligibility proofs")
errSigTooShort = errors.New("signature too short")

Check failure on line 49 in hare4/hare.go

View workflow job for this annotation

GitHub Actions / lint

var errSigTooShort is unused (U1000)
fetchFullTimeout = 5 * time.Second
)

type CommitteeUpgrade struct {
Layer types.LayerID
Size uint16
}

type Config struct {
Enable bool `mapstructure:"enable"`
EnableLayer types.LayerID `mapstructure:"enable-layer"`
DisableLayer types.LayerID `mapstructure:"disable-layer"`
Committee uint16 `mapstructure:"committee"`
Leaders uint16 `mapstructure:"leaders"`
IterationsLimit uint8 `mapstructure:"iterations-limit"`
PreroundDelay time.Duration `mapstructure:"preround-delay"`
RoundDuration time.Duration `mapstructure:"round-duration"`
Enable bool `mapstructure:"enable"`
EnableLayer types.LayerID `mapstructure:"enable-layer"`
DisableLayer types.LayerID `mapstructure:"disable-layer"`
Committee uint16 `mapstructure:"committee"`
CommitteeUpgrade *CommitteeUpgrade
Leaders uint16 `mapstructure:"leaders"`
IterationsLimit uint8 `mapstructure:"iterations-limit"`
PreroundDelay time.Duration `mapstructure:"preround-delay"`
RoundDuration time.Duration `mapstructure:"round-duration"`
// LogStats if true will log iteration statistics with INFO level at the start of the next iteration.
// This requires additional computation and should be used for debugging only.
LogStats bool `mapstructure:"log-stats"`
ProtocolName string `mapstructure:"protocolname"`
}

func (cfg *Config) CommitteeFor(layer types.LayerID) uint16 {
if cfg.CommitteeUpgrade != nil && layer >= cfg.CommitteeUpgrade.Layer {
return cfg.CommitteeUpgrade.Size
}
return cfg.Committee
}

func (cfg *Config) Validate(zdist time.Duration) error {
terminates := cfg.roundStart(IterRound{Iter: cfg.IterationsLimit, Round: hardlock})
if terminates > zdist {
return fmt.Errorf("hare terminates later (%v) than expected (%v)", terminates, zdist)

Check warning on line 84 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L81-L84

Added lines #L81 - L84 were not covered by tests
}
if cfg.Enable && cfg.DisableLayer <= cfg.EnableLayer {
return fmt.Errorf("disabled layer (%d) must be larger than enabled (%d)",
cfg.DisableLayer, cfg.EnableLayer)

Check warning on line 88 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L86-L88

Added lines #L86 - L88 were not covered by tests
}
return nil

Check warning on line 90 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L90

Added line #L90 was not covered by tests
}

func (cfg *Config) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
Expand All @@ -81,6 +95,10 @@
encoder.AddUint32("enabled layer", cfg.EnableLayer.Uint32())
encoder.AddUint32("disabled layer", cfg.DisableLayer.Uint32())
encoder.AddUint16("committee", cfg.Committee)
if cfg.CommitteeUpgrade != nil {
encoder.AddUint32("committee upgrade layer", cfg.CommitteeUpgrade.Layer.Uint32())
encoder.AddUint16("committee upgrade size", cfg.CommitteeUpgrade.Size)

Check warning on line 100 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
encoder.AddUint16("leaders", cfg.Leaders)
encoder.AddUint8("iterations limit", cfg.IterationsLimit)
encoder.AddDuration("preround delay", cfg.PreroundDelay)
Expand Down Expand Up @@ -162,9 +180,9 @@

// WithResultsChan overrides the default result channel with a different one.
// This is only needed for the migration period between hare3 and hare4.
func WithResultsChan(c chan ConsensusOutput) Opt {
return func(hr *Hare) {
hr.results = c

Check warning on line 185 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L183-L185

Added lines #L183 - L185 were not covered by tests
}
}

Expand Down Expand Up @@ -207,7 +225,6 @@
atxsdata: atxsdata,
proposals: proposals,
verifier: verif,
compactFn: compactTruncate,
oracle: &legacyOracle{
log: zap.NewNop(),
oracle: oracle,
Expand All @@ -222,7 +239,7 @@
}

if host != nil {
hr.p2p = server.New(host, PROTOCOL_NAME, hr.handleProposalsStream)

Check warning on line 242 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L242

Added line #L242 was not covered by tests
}
return hr
}
Expand Down Expand Up @@ -251,7 +268,6 @@
atxsdata *atxsdata.Data
proposals *store.Store
verifier verifier
compactFn func([]byte) []byte
oracle *legacyOracle
sync system.SyncStateProvider
patrol *layerpatrol.LayerPatrol
Expand Down Expand Up @@ -291,14 +307,14 @@
for next := enabled; next < disabled; next++ {
select {
case <-h.nodeclock.AwaitLayer(next):
h.log.Debug("notified", zap.Uint32("layer id", next.Uint32()))
h.log.Debug("notified", zap.Uint32("layer", next.Uint32()))
h.onLayer(next)
h.cleanMessageCache(next - 1)
case <-h.ctx.Done():
return nil
}
}
return nil

Check warning on line 317 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L317

Added line #L317 was not covered by tests
})
}

Expand All @@ -322,27 +338,22 @@
cb := func(ctx context.Context, rw io.ReadWriter) error {
respLen, _, err := codec.DecodeLen(rw)
if err != nil {
return fmt.Errorf("decode length: %w", err)

Check warning on line 341 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L341

Added line #L341 was not covered by tests
}
if respLen >= MAX_EXCHANGE_SIZE {
return errResponseTooBig

Check warning on line 344 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L344

Added line #L344 was not covered by tests
}
buff := make([]byte, respLen)
_, err = io.ReadFull(rw, buff)
if err != nil {
return fmt.Errorf("read response buffer: %w", err)
}
err = codec.Decode(buff, resp)
if err != nil {
b, err := codec.DecodeFrom(rw, resp)
if err != nil || b != int(respLen) {
return fmt.Errorf("decode response: %w", err)

Check warning on line 348 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L348

Added line #L348 was not covered by tests
}
return nil
}

err := h.p2p.StreamRequest(ctx, peer, reqBytes, cb)
if err != nil {
requestCompactErrorCounter.Inc()
return nil, fmt.Errorf("stream request: %w", err)

Check warning on line 356 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L355-L356

Added lines #L355 - L356 were not covered by tests
}

h.tracer.OnCompactIdResponse(resp)
Expand All @@ -354,25 +365,25 @@
requestCompactHandlerCounter.Inc()
compactProps := &CompactIdRequest{}
if err := codec.Decode(msg, compactProps); err != nil {
malformedError.Inc()
return fmt.Errorf("%w: decoding error %s", pubsub.ErrValidationReject, err.Error())

Check warning on line 369 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
h.tracer.OnCompactIdRequest(compactProps)
h.mu.Lock()
m, ok := h.messageCache[compactProps.MsgId]
h.mu.Unlock()
if !ok {
messageCacheMiss.Inc()
return fmt.Errorf("message %s: cache miss", compactProps.MsgId)

Check warning on line 377 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L376-L377

Added lines #L376 - L377 were not covered by tests
}
resp := &CompactIdResponse{Ids: m.Body.Value.Proposals}
respBytes := codec.MustEncode(resp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, can't we just encode resp into the stream instead of using an intermediate buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this one must know to which length the type is going to encode into without really encoding it (since you must encode how many bytes are in the response before actually writing them into the response). While it is possible, it's probably something that the scale package should offer instead of writing it by hand which would be fragile. Any ideas on how to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to encode the length first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you need to know how many bytes to read out of the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR, the scale decoding would fail if the response had the wrong length anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, while we could do smth like this in go-scale, it is not really fragile if the data are an array of fixed-size IDs.
pls check how it's done in the fetch package, e.g. epoch stream handler

if err := h.streamIDs(ctx, s, func(cbk retrieveCallback) error {

func (h *handler) streamIDs(ctx context.Context, s io.ReadWriter, retrieve retrieveFunc) error {
started := false
if err := retrieve(func(total int, id []byte) error {
if !started {
started = true
respSize := scale.LenSize(uint32(total)) + uint32(total*len(id))
if _, err := codec.EncodeLen(s, respSize); err != nil {
return err
}
if _, err := codec.EncodeLen(s, uint32(total)); err != nil {
return err
}
}
if _, err := s.Write(id[:]); err != nil {
return err
}
return nil
},
); err != nil {
if !started {
if err := server.WriteErrorResponse(s, err); err != nil {
h.logger.Debug("failed to write error response", log.ZContext(ctx), zap.Error(err))
}
}
return err
}
// If any IDs were sent:
// Response.Data already sent
// Response.Error has length 0
lens := []uint32{0}
if !started {
// If no ATX IDs were sent:
// Response.Data is just a single zero byte (length 0),
// but the length of Response.Data is 1 so we must send it
// Response.Error has length 0
lens = []uint32{1, 0, 0}
}
for _, l := range lens {
if _, err := codec.EncodeLen(s, l); err != nil {
return err
}
}
return nil
}

and the client part
return readIDSlice(s, &ed.AtxIDs, maxEpochDataAtxIDs)

func readIDSlice[V any, H scale.DecodablePtr[V]](r io.Reader, slice *[]V, limit uint32) (int, error) {
return server.ReadResponse(r, func(respLen uint32) (int, error) {
d := scale.NewDecoder(r)
length, total, err := scale.DecodeLen(d, limit)
if err != nil {
return total, err
}
if int(length*types.Hash32Length)+total != int(respLen) {
return total, errors.New("bad slice length")
}
*slice = make([]V, length)
for i := uint32(0); i < length; i++ {
n, err := H(&(*slice)[i]).DecodeScale(d)
total += n
if err != nil {
return total, err
}
}
return total, err
})
}

The client part is not perfect b/c we could be handling each ID right away instead of waiting for the whole slice to be sent, but that would require further refactoring of the fetcher code; not sure whether it is applicable here.
Nevertheless, the server side can be improved here I think

Maybe we could move some helpers to the codec package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR, the scale decoding would fail if the response had the wrong length anyway.

So the reason it is done this way is the following:
When you read a response from a peer you want to avoid a situation where you read a stream until an EOF - this is generally bad because you put yourself in the risk of having a malicious peer to just feed you data which is read all into memory, resulting in the node memory usage going through the roof.
That's why you want to know how long is the data in advance.

Copy link
Contributor Author

@acud acud Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivan4th the server.ReadResponse part implicitly reads the respLen which encodes the whole message size in advance https://github.com/spacemeshos/go-spacemesh/blob/develop/p2p/server/server.go#L511

Which is exactly what I'm doing here, except I don't want to use the leaky server abstraction because it does a bunch of things I'm not interested in.

if _, err := codec.EncodeLen(s, uint32(len(respBytes))); err != nil {
return fmt.Errorf("encode length: %w", err)

Check warning on line 382 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L382

Added line #L382 was not covered by tests
}

if _, err := s.Write(respBytes); err != nil {
return fmt.Errorf("write response: %w", err)

Check warning on line 386 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L386

Added line #L386 was not covered by tests
}

return nil
Expand All @@ -383,15 +394,14 @@
func (h *Hare) reconstructProposals(ctx context.Context, peer p2p.Peer, msgId types.Hash32, msg *Message) error {
proposals := h.proposals.GetForLayer(msg.Layer)
if len(proposals) == 0 {
return errNoLayerProposals

Check warning on line 397 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L397

Added line #L397 was not covered by tests
}
compacted := h.compactProposals(h.compactFn, msg.Layer, proposals)
compacted := h.compactProposals(msg.Layer, proposals)
proposalIds := make([]proposalTuple, len(proposals))
for i := range proposals {
proposalIds[i] = proposalTuple{id: proposals[i].ID(), compact: compacted[i]}
}
slices.SortFunc(proposalIds, sortProposalsTuple)

slices.SortFunc(proposalIds, func(i, j proposalTuple) int { return bytes.Compare(i.id[:], j.id[:]) })
taken := make([]bool, len(proposals))
findProp := func(id types.CompactProposalID) (bool, types.ProposalID) {
for i := 0; i < len(proposalIds); i++ {
Expand All @@ -399,7 +409,7 @@
continue
}
if taken[i] {
continue

Check warning on line 412 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L412

Added line #L412 was not covered by tests
}
// item is both not taken and equals to looked up ID
taken[i] = true
Expand All @@ -426,13 +436,13 @@

if ctr != len(msg.Value.CompactProposals) {
// this will force the calling context to do a fetchFull
return errCannotMatchProposals

Check warning on line 439 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L439

Added line #L439 was not covered by tests
}

// sort the found proposals and unset the compact proposals
// field before trying to check the signature
// since it would add unnecessary data to the hasher
slices.SortFunc(msg.Value.Proposals, sortProposalIds)
slices.SortFunc(msg.Value.Proposals, func(i, j types.ProposalID) int { return bytes.Compare(i[:], j[:]) })
msg.Value.CompactProposals = []types.CompactProposalID{}
return nil
}
Expand Down Expand Up @@ -472,33 +482,33 @@
case errors.Is(err, errCannotMatchProposals):
msg.Value.Proposals, err = h.fetchFull(ctx, peer, msgId)
if err != nil {
return fmt.Errorf("fetch full: %w", err)

Check warning on line 485 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L485

Added line #L485 was not covered by tests
}
slices.SortFunc(msg.Value.Proposals, sortProposalIds)
slices.SortFunc(msg.Value.Proposals, func(i, j types.ProposalID) int { return bytes.Compare(i[:], j[:]) })
msg.Value.CompactProposals = []types.CompactProposalID{}
fetched = true
case err != nil:
return fmt.Errorf("reconstruct proposals: %w", err)

Check warning on line 491 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L490-L491

Added lines #L490 - L491 were not covered by tests
}
}
if !h.verifier.Verify(signing.HARE, msg.Sender, msg.ToMetadata().ToBytes(), msg.Signature) {
if msg.IterRound.Round == preround && !fetched {
preroundSigFailCounter.Inc()

Check warning on line 496 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L496

Added line #L496 was not covered by tests
// we might have a bad signature because of a local hash collision
// of a proposal that has the same short hash that the node sent us.
// in this case we try to ask for a full exchange of all full proposal
// ids and try to validate again
var err error
msg.Body.Value.Proposals, err = h.fetchFull(ctx, peer, msgId)
if err != nil {
return fmt.Errorf("signature verify: fetch full: %w", err)

Check warning on line 504 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L501-L504

Added lines #L501 - L504 were not covered by tests
}
if len(msg.Body.Value.Proposals) != len(compacts) {
return fmt.Errorf("signature verify: proposals mismatch: %w", err)

Check warning on line 507 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L506-L507

Added lines #L506 - L507 were not covered by tests
}
if !h.verifier.Verify(signing.HARE, msg.Sender, msg.ToMetadata().ToBytes(), msg.Signature) {
signatureError.Inc()
return fmt.Errorf("%w: signature verify: invalid signature", pubsub.ErrValidationReject)

Check warning on line 511 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L509-L511

Added lines #L509 - L511 were not covered by tests
}
} else {
signatureError.Inc()
Expand Down Expand Up @@ -543,7 +553,7 @@
proof := equivocation.ToMalfeasanceProof()
if err := identities.SetMalicious(
h.db, equivocation.Messages[0].SmesherID, codec.MustEncode(proof), time.Now()); err != nil {
h.log.Error("failed to save malicious identity", zap.Error(err))

Check warning on line 556 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L556

Added line #L556 was not covered by tests
}
h.atxsdata.SetMalicious(equivocation.Messages[0].SmesherID)
}
Expand All @@ -559,17 +569,17 @@
func (h *Hare) onLayer(layer types.LayerID) {
h.proposals.OnLayer(layer)
if !h.sync.IsSynced(h.ctx) {
h.log.Debug("not synced", zap.Uint32("lid", layer.Uint32()))
return

Check warning on line 573 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L572-L573

Added lines #L572 - L573 were not covered by tests
}
beacon, err := beacons.Get(h.db, layer.GetEpoch())
if err != nil || beacon == types.EmptyBeacon {
h.log.Debug("no beacon",
zap.Uint32("epoch", layer.GetEpoch().Uint32()),
zap.Uint32("lid", layer.Uint32()),
zap.Error(err),
)
return

Check warning on line 582 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L577-L582

Added lines #L577 - L582 were not covered by tests
}
h.patrol.SetHareInCharge(layer)

Expand All @@ -580,7 +590,7 @@
beacon: beacon,
signers: maps.Values(h.signers),
vrfs: make([]*types.HareEligibility, len(h.signers)),
proto: newProtocol(h.config.Committee/2 + 1),
proto: newProtocol(h.config.CommitteeFor(layer)/2 + 1),
}
h.sessions[layer] = s.proto
h.mu.Unlock()
Expand Down Expand Up @@ -634,15 +644,15 @@
// initial set is not needed if node is not active in preround
select {
case <-h.wallclock.After(walltime.Sub(h.wallclock.Now())):
case <-h.ctx.Done():
return h.ctx.Err()

Check warning on line 648 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L647-L648

Added lines #L647 - L648 were not covered by tests
}
start := time.Now()
session.proto.OnInitial(h.selectProposals(session))
proposalsLatency.Observe(time.Since(start).Seconds())
}
if err := h.onOutput(session, current, session.proto.Next()); err != nil {
return err

Check warning on line 655 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L655

Added line #L655 was not covered by tests
}
result := false
for {
Expand Down Expand Up @@ -672,7 +682,7 @@
result = true
}
if err := h.onOutput(session, current, out); err != nil {
return err

Check warning on line 685 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L685

Added line #L685 was not covered by tests
}
// we are logginng stats 1 network delay after new iteration start
// so that we can receive notify messages from previous iteration
Expand All @@ -681,7 +691,7 @@
}
if out.terminated {
if !result {
return errors.New("terminated without result")

Check warning on line 694 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L694

Added line #L694 was not covered by tests
}
return nil
}
Expand All @@ -707,11 +717,11 @@
msg.Signature = session.signers[i].Sign(signing.HARE, msg.ToMetadata().ToBytes())
if ir.Round == preround {
var err error
msg.Body.Value.CompactProposals, err = h.compactProposalIds(h.compactFn, msg.Layer,
msg.Body.Value.CompactProposals, err = h.compactProposalIds(msg.Layer,
out.message.Body.Value.Proposals)
if err != nil {
h.log.Debug("failed to compact proposals", zap.Error(err))
continue

Check warning on line 724 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L723-L724

Added lines #L723 - L724 were not covered by tests
}
fullProposals := msg.Body.Value.Proposals
msg.Body.Value.Proposals = []types.ProposalID{}
Expand All @@ -723,7 +733,7 @@
msg.Body.Value.Proposals = []types.ProposalID{}
}
if err := h.pubsub.Publish(h.ctx, h.config.ProtocolName, msg.ToBytes()); err != nil {
h.log.Error("failed to publish", zap.Inline(&msg), zap.Error(err))

Check warning on line 736 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L736

Added line #L736 was not covered by tests
}
}
h.tracer.OnMessageSent(out.message)
Expand All @@ -734,16 +744,16 @@
)
if out.coin != nil {
select {
case <-h.ctx.Done():
return h.ctx.Err()

Check warning on line 748 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L747-L748

Added lines #L747 - L748 were not covered by tests
case h.coins <- WeakCoinOutput{Layer: session.lid, Coin: *out.coin}:
}
sessionCoin.Inc()
}
if out.result != nil {
select {
case <-h.ctx.Done():
return h.ctx.Err()

Check warning on line 756 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L755-L756

Added lines #L755 - L756 were not covered by tests
case h.results <- ConsensusOutput{Layer: session.lid, Proposals: out.result}:
}
sessionResult.Inc()
Expand All @@ -766,11 +776,11 @@
for _, signer := range session.signers {
atxid, err := atxs.GetIDByEpochAndNodeID(h.db, publish, signer.NodeID())
switch {
case errors.Is(err, sql.ErrNotFound):

Check warning on line 779 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L779

Added line #L779 was not covered by tests
// if atx is not registered for identity we will get sql.ErrNotFound
case err != nil:
h.log.Error("failed to get atx id by epoch and node id", zap.Error(err))
return []types.ProposalID{}

Check warning on line 783 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L781-L783

Added lines #L781 - L783 were not covered by tests
default:
own := h.atxsdata.Get(target, atxid)
if min == nil || (min != nil && own != nil && own.Height < min.Height) {
Expand All @@ -779,8 +789,8 @@
}
}
if min == nil {
h.log.Debug("no atxs in the requested epoch", zap.Uint32("epoch", session.lid.GetEpoch().Uint32()-1))
return []types.ProposalID{}

Check warning on line 793 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L792-L793

Added lines #L792 - L793 were not covered by tests
}

candidates := h.proposals.GetForLayer(session.lid)
Expand Down Expand Up @@ -808,8 +818,8 @@
}
header := h.atxsdata.Get(target, p.AtxID)
if header == nil {
h.log.Error("atx is not loaded", zap.Stringer("atxid", p.AtxID))
return []types.ProposalID{}

Check warning on line 822 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L821-L822

Added lines #L821 - L822 were not covered by tests
}
if header.BaseHeight >= min.Height {
// does not vote for future proposal
Expand All @@ -827,8 +837,8 @@
h.log.Warn("proposal has different beacon value",
zap.Uint32("lid", session.lid.Uint32()),
zap.Stringer("id", p.ID()),
zap.String("proposal_beacon", p.Beacon().ShortString()),
zap.String("epoch_beacon", session.beacon.ShortString()),
zap.Stringer("proposal_beacon", p.Beacon()),
zap.Stringer("epoch_beacon", session.beacon),
)
}
}
Expand All @@ -851,13 +861,13 @@
h.mu.Lock()
defer h.mu.Unlock()
for k, item := range h.messageCache {
if item.Layer < l {

Check warning on line 864 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L864

Added line #L864 was not covered by tests
// mark key for deletion
keys = append(keys, k)

Check warning on line 866 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L866

Added line #L866 was not covered by tests
}
}
for _, v := range keys {
delete(h.messageCache, v)

Check warning on line 870 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L870

Added line #L870 was not covered by tests
}
}

Expand All @@ -876,47 +886,39 @@
vrfs []*types.HareEligibility
}

type compactFunc func([]byte) []byte

// compactFunc will truncate a given byte slice to a shorter
// byte slice by reslicing.
func compactTruncate(b []byte) []byte {
return b[:4]
}

func compactVrf(compacter compactFunc, v types.VrfSignature) (c types.CompactProposalID) {
b := compacter(v[:])
copy(c[:], b)
return c
}

func (h *Hare) compactProposals(compacter compactFunc, layer types.LayerID,
func (h *Hare) compactProposals(layer types.LayerID,
proposals []*types.Proposal,
) []types.CompactProposalID {
compactProposals := make([]types.CompactProposalID, len(proposals))
for i, prop := range proposals {
vrf := prop.EligibilityProofs[0].Sig
compactProposals[i] = compactVrf(compacter, vrf)
var c types.CompactProposalID
copy(c[:], vrf[:4])
compactProposals[i] = c
}
return compactProposals
}

func (h *Hare) compactProposalIds(compacter compactFunc, layer types.LayerID,
func (h *Hare) compactProposalIds(layer types.LayerID,
proposals []types.ProposalID,
) ([]types.CompactProposalID, error) {
compactProposals := make([]types.CompactProposalID, len(proposals))
for i, prop := range proposals {
fp := h.proposals.Get(layer, prop)
if fp == nil {
return nil, errCannotFindProposal

Check warning on line 909 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L909

Added line #L909 was not covered by tests
}

// we must handle this explicitly or we risk a panic on
// a nil slice access below
if len(fp.EligibilityProofs) == 0 {
return nil, errNoEligibilityProofs

Check warning on line 915 in hare4/hare.go

View check run for this annotation

Codecov / codecov/patch

hare4/hare.go#L915

Added line #L915 was not covered by tests
}
compactProposals[i] = compactVrf(compacter, fp.EligibilityProofs[0].Sig)

var c types.CompactProposalID
copy(c[:], fp.EligibilityProofs[0].Sig[:4])

compactProposals[i] = c
acud marked this conversation as resolved.
Show resolved Hide resolved
}
return compactProposals, nil
}
Expand All @@ -925,11 +927,3 @@
id types.ProposalID
compact types.CompactProposalID
}

func sortProposalsTuple(i, j proposalTuple) int {
return sortProposalIds(i.id, j.id)
}

func sortProposalIds(i, j types.ProposalID) int {
return bytes.Compare(i[:], j[:])
}
Loading
Loading