Skip to content

Commit

Permalink
try with cond
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Oct 25, 2024
1 parent 04288f5 commit ffb8dd3
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
44 changes: 42 additions & 2 deletions hare4/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -199,11 +200,12 @@ func New(
proposals *store.Store,
verifier verifier,
oracle oracle,
sync system.SyncStateProvider,
syncProv system.SyncStateProvider,
patrol *layerpatrol.LayerPatrol,
host server.Host,
opts ...Opt,
) *Hare {
condMtx := &sync.Mutex{}
ctx, cancel := context.WithCancel(context.Background())
hr := &Hare{
ctx: ctx,
Expand All @@ -213,6 +215,7 @@ func New(
signers: make(map[string]*signing.EdSigner),
sessions: make(map[types.LayerID]*protocol),
messageCache: make(map[types.Hash32]Message),
cond: sync.NewCond(condMtx),

config: DefaultConfig(),
log: zap.NewNop(),
Expand All @@ -229,7 +232,7 @@ func New(
oracle: oracle,
config: DefaultConfig(),
},
sync: sync,
sync: syncProv,
patrol: patrol,
tracer: noopTracer{},
}
Expand All @@ -255,6 +258,8 @@ type Hare struct {
sessions map[types.LayerID]*protocol
messageCache map[types.Hash32]Message

cond *sync.Cond

// options
config Config
log *zap.Logger
Expand Down Expand Up @@ -503,10 +508,45 @@ func (h *Hare) Handler(ctx context.Context, peer p2p.Peer, buf []byte) error {
switch {
case errors.Is(err, errCannotMatchProposals):
h.log.Info("preround message but couldn't reconstruct", zap.Int("nr compacts", len(compacts)), zap.Inline(msg))
grindOk := atomic.Bool{}
grindFail := atomic.Bool{}
ownSucceeded := atomic.Bool{}
go func() {
h.cond.L.Lock()
for i := 0; i < 5; i++ {
h.cond.Wait()
if ownSucceeded.Load() {
break
}
err := h.reconstructProposals(ctx, peer, msgId, msg)
if err != nil {
h.log.Error("hare4 grind reconstruct proposals error", zap.Error(err))
} else {
grindOk.Store(true)
break
}
}
h.cond.L.Unlock()
grindFail.Store(true)
}()
start := time.Now()
msg.Value.Proposals, err = h.fetchFull(ctx, peer, msgId)
if err != nil {
for !grindFail.Load() {
if grindOk.Load() {
goto SORT
}
time.Sleep(50 * time.Millisecond)
if time.Since(start) > 2*time.Second {
break
}
}

return fmt.Errorf("fetch full: %w", err)
}
ownSucceeded.Store(true)
h.cond.Broadcast()
SORT:
slices.SortFunc(msg.Value.Proposals, func(i, j types.ProposalID) int { return bytes.Compare(i[:], j[:]) })
msg.Value.CompactProposals = []types.CompactProposalID{}
fetched = true
Expand Down
2 changes: 2 additions & 0 deletions hare4/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (n *node) withPublisher() *node {

func (n *node) withStreamRequester() *node {
n.mockStreamRequester = hmock.NewMockstreamRequester(n.ctrl)
n.mockStreamRequester.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
return n
}

Expand Down Expand Up @@ -599,6 +600,7 @@ func (cl *lockstepCluster) drainInteractiveMessages() {
case <-n.tracer.compactReq:
case <-n.tracer.compactResp:
case <-done:
return
}
}
}()
Expand Down
1 change: 0 additions & 1 deletion hare4/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (p *protocol) OnInitial(proposals []types.ProposalID) {
p.mu.Lock()
defer p.mu.Unlock()
p.initial = proposals
fmt.Println("protocol on initial proposals", len(p.initial))
}

func (p *protocol) OnInput(msg *input) (bool, *wire.HareProof) {
Expand Down

0 comments on commit ffb8dd3

Please sign in to comment.