diff --git a/hare4/hare.go b/hare4/hare.go index 0e01825017..6fe885a758 100644 --- a/hare4/hare.go +++ b/hare4/hare.go @@ -9,6 +9,7 @@ import ( "math" "slices" "sync" + "sync/atomic" "time" "github.com/jonboulle/clockwork" @@ -199,11 +200,12 @@ func New( proposals *store.Store, verifier verifier, oracle oracle, - sync system.SyncStateProvider, + syncProv system.SyncStateProvider, patrol *layerpatrol.LayerPatrol, host server.Host, opts ...Opt, ) *Hare { + condMtx := &sync.Mutex{} ctx, cancel := context.WithCancel(context.Background()) hr := &Hare{ ctx: ctx, @@ -213,6 +215,7 @@ func New( signers: make(map[string]*signing.EdSigner), sessions: make(map[types.LayerID]*protocol), messageCache: make(map[types.Hash32]Message), + cond: sync.NewCond(condMtx), config: DefaultConfig(), log: zap.NewNop(), @@ -229,7 +232,7 @@ func New( oracle: oracle, config: DefaultConfig(), }, - sync: sync, + sync: syncProv, patrol: patrol, tracer: noopTracer{}, } @@ -255,6 +258,8 @@ type Hare struct { sessions map[types.LayerID]*protocol messageCache map[types.Hash32]Message + cond *sync.Cond + // options config Config log *zap.Logger @@ -503,10 +508,45 @@ func (h *Hare) Handler(ctx context.Context, peer p2p.Peer, buf []byte) error { switch { case errors.Is(err, errCannotMatchProposals): h.log.Info("preround message but couldn't reconstruct", zap.Int("nr compacts", len(compacts)), zap.Inline(msg)) + grindOk := atomic.Bool{} + grindFail := atomic.Bool{} + ownSucceeded := atomic.Bool{} + go func() { + h.cond.L.Lock() + for i := 0; i < 5; i++ { + h.cond.Wait() + if ownSucceeded.Load() { + break + } + err := h.reconstructProposals(ctx, peer, msgId, msg) + if err != nil { + h.log.Error("hare4 grind reconstruct proposals error", zap.Error(err)) + } else { + grindOk.Store(true) + break + } + } + h.cond.L.Unlock() + grindFail.Store(true) + }() + start := time.Now() msg.Value.Proposals, err = h.fetchFull(ctx, peer, msgId) if err != nil { + for !grindFail.Load() { + if grindOk.Load() { + goto SORT + } + time.Sleep(50 * time.Millisecond) + if time.Since(start) > 2*time.Second { + break + } + } + return fmt.Errorf("fetch full: %w", err) } + ownSucceeded.Store(true) + h.cond.Broadcast() + SORT: slices.SortFunc(msg.Value.Proposals, func(i, j types.ProposalID) int { return bytes.Compare(i[:], j[:]) }) msg.Value.CompactProposals = []types.CompactProposalID{} fetched = true diff --git a/hare4/hare_test.go b/hare4/hare_test.go index abc9005b51..716daa2f48 100644 --- a/hare4/hare_test.go +++ b/hare4/hare_test.go @@ -224,6 +224,7 @@ func (n *node) withPublisher() *node { func (n *node) withStreamRequester() *node { n.mockStreamRequester = hmock.NewMockstreamRequester(n.ctrl) + n.mockStreamRequester.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() return n } @@ -599,6 +600,7 @@ func (cl *lockstepCluster) drainInteractiveMessages() { case <-n.tracer.compactReq: case <-n.tracer.compactResp: case <-done: + return } } }() diff --git a/hare4/protocol.go b/hare4/protocol.go index d6d76d8268..34b58f1d86 100644 --- a/hare4/protocol.go +++ b/hare4/protocol.go @@ -114,7 +114,6 @@ func (p *protocol) OnInitial(proposals []types.ProposalID) { p.mu.Lock() defer p.mu.Unlock() p.initial = proposals - fmt.Println("protocol on initial proposals", len(p.initial)) } func (p *protocol) OnInput(msg *input) (bool, *wire.HareProof) {