diff --git a/.changeset/rare-carpets-cry.md b/.changeset/rare-carpets-cry.md new file mode 100644 index 00000000000..f096d31885c --- /dev/null +++ b/.changeset/rare-carpets-cry.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal capability dispatcher threading and context usage diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index 63b0fad7e98..b91211b5bf5 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -6,6 +6,8 @@ import ( sync "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -24,7 +26,7 @@ type dispatcher struct { peerID p2ptypes.PeerID signer p2ptypes.Signer registry core.CapabilitiesRegistry - receivers map[key]remotetypes.Receiver + receivers map[key]*receiver mu sync.RWMutex stopCh services.StopChan wg sync.WaitGroup @@ -45,7 +47,7 @@ func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, reg peerWrapper: peerWrapper, signer: signer, registry: registry, - receivers: make(map[key]remotetypes.Receiver), + receivers: make(map[key]*receiver), stopCh: make(services.StopChan), lggr: lggr.Named("Dispatcher"), } @@ -58,12 +60,35 @@ func (d *dispatcher) Start(ctx context.Context) error { return fmt.Errorf("peer is not initialized") } d.wg.Add(1) - go d.receive() + go func() { + defer d.wg.Done() + d.receive() + }() + d.lggr.Info("dispatcher started") return nil } -func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { +func (d *dispatcher) Close() error { + close(d.stopCh) + d.wg.Wait() + d.lggr.Info("dispatcher closed") + return nil +} + +var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "capability_receive_channel_usage", + Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.", +}, []string{"capabilityId", "donId"}) + +const receiverBufferSize = 10000 + +type receiver struct { + cancel context.CancelFunc + ch chan *remotetypes.MessageBody +} + +func (d *dispatcher) SetReceiver(capabilityId string, donId string, rec remotetypes.Receiver) error { d.mu.Lock() defer d.mu.Unlock() k := key{capabilityId, donId} @@ -71,7 +96,29 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver rem if ok { return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId) } - d.receivers[k] = receiver + + receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize) + + ctx, cancelCtx := d.stopCh.NewCtx() + d.wg.Add(1) + go func() { + defer cancelCtx() + defer d.wg.Done() + for { + select { + case <-ctx.Done(): + return + case msg := <-receiverCh: + rec.Receive(ctx, msg) + } + } + }() + + d.receivers[k] = &receiver{ + cancel: cancelCtx, + ch: receiverCh, + } + d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId) return nil } @@ -79,8 +126,13 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver rem func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) { d.mu.Lock() defer d.mu.Unlock() - delete(d.receivers, key{capabilityId, donId}) - d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId) + + receiverKey := key{capabilityId, donId} + if receiver, ok := d.receivers[receiverKey]; ok { + receiver.cancel() + delete(d.receivers, receiverKey) + d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId) + } } func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { @@ -105,7 +157,6 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo } func (d *dispatcher) receive() { - defer d.wg.Done() recvCh := d.peer.Receive() for { select { @@ -128,7 +179,14 @@ func (d *dispatcher) receive() { d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND) continue } - receiver.Receive(body) + + receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize + capReceiveChannelUsage.WithLabelValues(k.capId, k.donId).Set(receiverQueueUsage) + select { + case receiver.ch <- body: + default: + d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId) + } } } } @@ -150,13 +208,6 @@ func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetyp } } -func (d *dispatcher) Close() error { - close(d.stopCh) - d.wg.Wait() - d.lggr.Info("dispatcher closed") - return nil -} - func (d *dispatcher) Ready() error { return nil } diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index b6ba31aa8f2..040ee849e08 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -1,6 +1,7 @@ package remote_test import ( + "context" "errors" "testing" @@ -26,7 +27,7 @@ func newReceiver() *testReceiver { } } -func (r *testReceiver) Receive(msg *remotetypes.MessageBody) { +func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody) { r.ch <- msg } diff --git a/core/capabilities/remote/target/client.go b/core/capabilities/remote/target/client.go index 281e7ac5fc1..dbb7c2f8bd8 100644 --- a/core/capabilities/remote/target/client.go +++ b/core/capabilities/remote/target/client.go @@ -134,11 +134,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest return nil, fmt.Errorf("request for message ID %s already exists", messageID) } - // TODO confirm reasons for below workaround and see if can be resolved - // The context passed in by the workflow engine is cancelled prior to the results being read from the response channel - // The wrapping of the context with 'WithoutCancel' is a workaround for that behaviour. - requestCtx := context.WithoutCancel(ctx) - req, err := request.NewClientRequest(requestCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, + req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, c.requestTimeout) if err != nil { return nil, fmt.Errorf("failed to create client request: %w", err) @@ -149,10 +145,9 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest return req.ResponseChan(), nil } -func (c *client) Receive(msg *types.MessageBody) { +func (c *client) Receive(ctx context.Context, msg *types.MessageBody) { c.mutex.Lock() defer c.mutex.Unlock() - ctx, _ := c.stopCh.NewCtx() messageID := GetMessageID(msg) diff --git a/core/capabilities/remote/target/client_test.go b/core/capabilities/remote/target/client_test.go index a00ec6dda6c..64b824d74d4 100644 --- a/core/capabilities/remote/target/client_test.go +++ b/core/capabilities/remote/target/client_test.go @@ -152,7 +152,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo ID: "workflow-don", } - broker := newTestAsyncMessageBroker(100) + broker := newTestAsyncMessageBroker(t, 100) receivers := make([]remotetypes.Receiver, numCapabilityPeers) for i := 0; i < numCapabilityPeers; i++ { @@ -229,7 +229,7 @@ func newTestServer(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, wo } } -func (t *clientTestServer) Receive(msg *remotetypes.MessageBody) { +func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBody) { t.mux.Lock() defer t.mux.Unlock() @@ -297,7 +297,7 @@ func NewTestDispatcher() *TestDispatcher { } func (t *TestDispatcher) SendToReceiver(msgBody *remotetypes.MessageBody) { - t.receiver.Receive(msgBody) + t.receiver.Receive(context.Background(), msgBody) } func (t *TestDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go index 5c9dc191878..d5305afeb32 100644 --- a/core/capabilities/remote/target/endtoend_test.go +++ b/core/capabilities/remote/target/endtoend_test.go @@ -16,6 +16,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -215,7 +216,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta F: workflowDonF, } - broker := newTestAsyncMessageBroker(1000) + broker := newTestAsyncMessageBroker(t, 1000) workflowDONs := map[string]commoncap.DON{ workflowDonInfo.ID: workflowDonInfo, @@ -276,6 +277,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta type testAsyncMessageBroker struct { services.StateMachine + t *testing.T + nodes map[p2ptypes.PeerID]remotetypes.Receiver sendCh chan *remotetypes.MessageBody @@ -292,8 +295,9 @@ func (a *testAsyncMessageBroker) Name() string { return "testAsyncMessageBroker" } -func newTestAsyncMessageBroker(sendChBufferSize int) *testAsyncMessageBroker { +func newTestAsyncMessageBroker(t *testing.T, sendChBufferSize int) *testAsyncMessageBroker { return &testAsyncMessageBroker{ + t: t, nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver), stopCh: make(services.StopChan), sendCh: make(chan *remotetypes.MessageBody, sendChBufferSize), @@ -318,7 +322,7 @@ func (a *testAsyncMessageBroker) Start(ctx context.Context) error { panic("server not found for peer id") } - receiver.Receive(msg) + receiver.Receive(tests.Context(a.t), msg) } } }() diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go index 9a578eebd3e..a918dd91700 100644 --- a/core/capabilities/remote/target/server.go +++ b/core/capabilities/remote/target/server.go @@ -106,8 +106,7 @@ func (r *server) expireRequests() { } } -// Receive handles incoming messages from remote nodes and dispatches them to the corresponding request. -func (r *server) Receive(msg *types.MessageBody) { +func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { r.receiveLock.Lock() defer r.receiveLock.Unlock() @@ -135,9 +134,6 @@ func (r *server) Receive(msg *types.MessageBody) { req := r.requestIDToRequest[requestID] - // TODO context should be received from the dispatcher here - pending KS-296 - ctx, cancel := r.stopCh.NewCtx() - defer cancel() err := req.OnMessage(ctx, msg) if err != nil { r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err) diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go index 507612c143c..3c12ce813d9 100644 --- a/core/capabilities/remote/target/server_test.go +++ b/core/capabilities/remote/target/server_test.go @@ -136,7 +136,7 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T, } var srvcs []services.Service - broker := newTestAsyncMessageBroker(1000) + broker := newTestAsyncMessageBroker(t, 1000) err := broker.Start(context.Background()) require.NoError(t, err) srvcs = append(srvcs, broker) @@ -183,7 +183,7 @@ type serverTestClient struct { callerDonID string } -func (r *serverTestClient) Receive(msg *remotetypes.MessageBody) { +func (r *serverTestClient) Receive(_ context.Context, msg *remotetypes.MessageBody) { r.receivedMessages <- msg } diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 9710d102d49..9e7fc893525 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -71,7 +71,7 @@ func (p *triggerPublisher) Start(ctx context.Context) error { return nil } -func (p *triggerPublisher) Receive(msg *types.MessageBody) { +func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { sender := ToPeerID(msg.Sender) if msg.Method == types.MethodRegisterTrigger { req, err := pb.UnmarshalCapabilityRequest(msg.Payload) diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index ff7c1cde3c7..db792fb5061 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -70,7 +70,7 @@ func TestTriggerPublisher_Register(t *testing.T) { CallerDonId: workflowDonInfo.ID, Payload: marshaled, } - publisher.Receive(regEvent) + publisher.Receive(ctx, regEvent) forwarded := <-underlying.registrationsCh require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID) diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index a16725af49a..b20adc36974 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -168,7 +168,7 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo return nil } -func (s *triggerSubscriber) Receive(msg *types.MessageBody) { +func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { sender := ToPeerID(msg.Sender) if _, found := s.capDonMembers[sender]; !found { s.lggr.Errorw("received message from unexpected node", "capabilityId", s.capInfo.ID, "sender", sender) diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index f8c5e6ff6e9..3b819f9f3cc 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -95,7 +95,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { }, Payload: marshaled, } - subscriber.Receive(triggerEvent) + subscriber.Receive(ctx, triggerEvent) response := <-triggerEventCallbackCh require.Equal(t, response.Value, triggerEventValue) diff --git a/core/capabilities/remote/types/mocks/receiver.go b/core/capabilities/remote/types/mocks/receiver.go index bc41baa5496..b2329123537 100644 --- a/core/capabilities/remote/types/mocks/receiver.go +++ b/core/capabilities/remote/types/mocks/receiver.go @@ -3,6 +3,8 @@ package mocks import ( + context "context" + types "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" mock "github.com/stretchr/testify/mock" ) @@ -12,9 +14,9 @@ type Receiver struct { mock.Mock } -// Receive provides a mock function with given fields: msg -func (_m *Receiver) Receive(msg *types.MessageBody) { - _m.Called(msg) +// Receive provides a mock function with given fields: ctx, msg +func (_m *Receiver) Receive(ctx context.Context, msg *types.MessageBody) { + _m.Called(ctx, msg) } // NewReceiver creates a new instance of Receiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index 37d05174dfb..9c9cf67aa15 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -5,6 +5,8 @@ package types import ( + "context" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -25,7 +27,7 @@ type Dispatcher interface { //go:generate mockery --quiet --name Receiver --output ./mocks/ --case=underscore type Receiver interface { - Receive(msg *MessageBody) + Receive(ctx context.Context, msg *MessageBody) } type Aggregator interface {