diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 092a66fbf..44d93e21e 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -173,7 +173,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr } } peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, network.Self(), p, network, onDontHaveTimeout) + return bsmq.New(ctx, p, network, onDontHaveTimeout) } sim := bssim.New() diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 5b875124c..0b9dc249e 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -57,6 +57,7 @@ type MessageNetwork interface { NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result + Self() peer.ID } // MessageQueue implements queue of want messages to send to peers. @@ -64,7 +65,6 @@ type MessageQueue struct { ctx context.Context shutdown func() p peer.ID - self peer.ID network MessageNetwork dhTimeoutMgr DontHaveTimeoutManager @@ -231,14 +231,14 @@ type DontHaveTimeoutManager interface { } // New creates a new MessageQueue. -func New(ctx context.Context, self, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { +func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { onTimeout := func(ks []cid.Cid) { log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p) onDontHaveTimeout(p, ks) } clock := clock.New() dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock) - return newMessageQueue(ctx, self, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil) } type messageEvent int @@ -252,7 +252,6 @@ const ( // This constructor is used by the tests func newMessageQueue( ctx context.Context, - self peer.ID, p peer.ID, network MessageNetwork, maxMsgSize int, @@ -266,7 +265,6 @@ func newMessageQueue( return &MessageQueue{ ctx: ctx, shutdown: cancel, - self: self, p: p, network: network, dhTimeoutMgr: dhTimeoutMgr, @@ -649,7 +647,7 @@ func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { return } - self := mq.self + self := mq.network.Self() for _, e := range wantlist { if e.Cancel { if e.WantType == pb.Message_Wantlist_Have { diff --git a/bitswap/network/httpnet/connecteventmanager.go b/bitswap/network/httpnet/connecteventmanager.go deleted file mode 100644 index ee3372daf..000000000 --- a/bitswap/network/httpnet/connecteventmanager.go +++ /dev/null @@ -1,217 +0,0 @@ -package httpnet - -import ( - "sync" - - "github.com/gammazero/deque" - "github.com/libp2p/go-libp2p/core/peer" -) - -type ConnectionListener interface { - PeerConnected(peer.ID) - PeerDisconnected(peer.ID) -} - -type state byte - -const ( - stateDisconnected = iota - stateResponsive - stateUnresponsive -) - -type connectEventManager struct { - connListeners []ConnectionListener - lk sync.RWMutex - cond sync.Cond - peers map[peer.ID]*peerState - - changeQueue deque.Deque[peer.ID] - stop bool - done chan struct{} -} - -type peerState struct { - newState, curState state - pending bool -} - -func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { - evtManager := &connectEventManager{ - connListeners: connListeners, - peers: make(map[peer.ID]*peerState), - done: make(chan struct{}), - } - evtManager.cond = sync.Cond{L: &evtManager.lk} - return evtManager -} - -func (c *connectEventManager) Start() { - go c.worker() -} - -func (c *connectEventManager) Stop() { - c.lk.Lock() - c.stop = true - c.lk.Unlock() - c.cond.Broadcast() - - <-c.done -} - -func (c *connectEventManager) getState(p peer.ID) state { - if state, ok := c.peers[p]; ok { - return state.newState - } else { - return stateDisconnected - } -} - -func (c *connectEventManager) setState(p peer.ID, newState state) { - state, ok := c.peers[p] - if !ok { - state = new(peerState) - c.peers[p] = state - } - state.newState = newState - if !state.pending && state.newState != state.curState { - state.pending = true - c.changeQueue.PushBack(p) - c.cond.Broadcast() - } -} - -// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the -// connect event manager has been stopped. -func (c *connectEventManager) waitChange() bool { - for !c.stop && c.changeQueue.Len() == 0 { - c.cond.Wait() - } - return !c.stop -} - -func (c *connectEventManager) worker() { - c.lk.Lock() - defer c.lk.Unlock() - defer close(c.done) - - for c.waitChange() { - pid := c.changeQueue.PopFront() - - state, ok := c.peers[pid] - // If we've disconnected and forgotten, continue. - if !ok { - // This shouldn't be possible because _this_ thread is responsible for - // removing peers from this map, and we shouldn't get duplicate entries in - // the change queue. - log.Error("a change was enqueued for a peer we're not tracking") - continue - } - - // Record the fact that this "state" is no longer in the queue. - state.pending = false - - // Then, if there's nothing to do, continue. - if state.curState == state.newState { - continue - } - - // Or record the state update, then apply it. - oldState := state.curState - state.curState = state.newState - - switch state.newState { - case stateDisconnected: - delete(c.peers, pid) - fallthrough - case stateUnresponsive: - // Only trigger a disconnect event if the peer was responsive. - // We could be transitioning from unresponsive to disconnected. - if oldState == stateResponsive { - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerDisconnected(pid) - } - c.lk.Lock() - } - case stateResponsive: - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerConnected(pid) - } - c.lk.Lock() - } - } -} - -// Called whenever we receive a new connection. May be called many times. -func (c *connectEventManager) Connected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // !responsive -> responsive - - if c.getState(p) == stateResponsive { - return - } - c.setState(p, stateResponsive) -} - -// Called when we drop the final connection to a peer. -func (c *connectEventManager) Disconnected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // !disconnected -> disconnected - - if c.getState(p) == stateDisconnected { - return - } - - c.setState(p, stateDisconnected) -} - -// Called whenever a peer is unresponsive. -func (c *connectEventManager) MarkUnresponsive(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // responsive -> unresponsive - - if c.getState(p) != stateResponsive { - return - } - - c.setState(p, stateUnresponsive) -} - -// Called whenever we receive a message from a peer. -// -// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive). -// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process -// -// the "on message" event, so we can't treat this as evidence of a connection. -func (c *connectEventManager) OnMessage(p peer.ID) { - c.lk.RLock() - unresponsive := c.getState(p) == stateUnresponsive - c.lk.RUnlock() - - // Only continue if both connected, and unresponsive. - if !unresponsive { - return - } - - // unresponsive -> responsive - - // We need to make a modification so now take a write lock - c.lk.Lock() - defer c.lk.Unlock() - - // Note: state may have changed in the time between when read lock - // was released and write lock taken, so check again - if c.getState(p) != stateUnresponsive { - return - } - - c.setState(p, stateResponsive) -} diff --git a/bitswap/network/httpnet/connecteventmanager_test.go b/bitswap/network/httpnet/connecteventmanager_test.go deleted file mode 100644 index b2bacf876..000000000 --- a/bitswap/network/httpnet/connecteventmanager_test.go +++ /dev/null @@ -1,168 +0,0 @@ -package httpnet - -import ( - "sync" - "testing" - "time" - - "github.com/ipfs/go-test/random" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" -) - -type mockConnEvent struct { - connected bool - peer peer.ID -} - -type mockConnListener struct { - sync.Mutex - events []mockConnEvent -} - -func newMockConnListener() *mockConnListener { - return new(mockConnListener) -} - -func (cl *mockConnListener) PeerConnected(p peer.ID) { - cl.Lock() - defer cl.Unlock() - cl.events = append(cl.events, mockConnEvent{connected: true, peer: p}) -} - -func (cl *mockConnListener) PeerDisconnected(p peer.ID) { - cl.Lock() - defer cl.Unlock() - cl.events = append(cl.events, mockConnEvent{connected: false, peer: p}) -} - -func wait(t *testing.T, c *connectEventManager) { - require.Eventually(t, func() bool { - c.lk.RLock() - defer c.lk.RUnlock() - return len(c.changeQueue) == 0 - }, time.Second, time.Millisecond, "connection event manager never processed events") -} - -func TestConnectEventManagerConnectDisconnect(t *testing.T) { - connListener := newMockConnListener() - peers := random.Peers(2) - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Connect A twice, should only see one event - cem.Connected(peers[0]) - cem.Connected(peers[0]) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: peers[0], - connected: true, - }) - - // Flush the event queue. - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) - - // Block up the event loop. - connListener.Lock() - cem.Connected(peers[1]) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: peers[1], - connected: true, - }) - - // We don't expect this to show up. - cem.Disconnected(peers[0]) - cem.Connected(peers[0]) - - connListener.Unlock() - - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) -} - -func TestConnectEventManagerMarkUnresponsive(t *testing.T) { - connListener := newMockConnListener() - p := random.Peers(1)[0] - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Don't mark as connected when we receive a message (could have been delayed). - cem.OnMessage(p) - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) - - // Handle connected event. - cem.Connected(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // Becomes unresponsive. - cem.MarkUnresponsive(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: false, - }) - require.Equal(t, expectedEvents, connListener.events) - - // We have a new connection, mark them responsive. - cem.Connected(p) - wait(t, cem) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // No duplicate event. - cem.OnMessage(p) - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) -} - -func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { - connListener := newMockConnListener() - p := random.Peers(1)[0] - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Handle connected event. - cem.Connected(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // Becomes unresponsive. - cem.MarkUnresponsive(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: false, - }) - require.Equal(t, expectedEvents, connListener.events) - - cem.Disconnected(p) - wait(t, cem) - require.Empty(t, cem.peers) // all disconnected - require.Equal(t, expectedEvents, connListener.events) -}