diff --git a/.changeset/stale-falcons-train.md b/.changeset/stale-falcons-train.md new file mode 100644 index 00000000000..ddaa0907f85 --- /dev/null +++ b/.changeset/stale-falcons-train.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added [Keystone] Batch identical trigger events diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 03a1dd54f02..f2bfd5e4b16 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -394,7 +394,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: - newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { + newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { publisher := remote.NewTriggerPublisher( capabilityConfig.RemoteTriggerConfig, capability.(capabilities.TriggerCapability), @@ -416,7 +416,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee case capabilities.CapabilityTypeConsensus: w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") case capabilities.CapabilityTypeTarget: - newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { + newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { return target.NewServer( capabilityConfig.RemoteTargetConfig, myPeerID, @@ -441,12 +441,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee return nil } -type receiverService interface { - services.Service - remotetypes.Receiver -} - -func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Capability, don registrysyncer.DON, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error)) error { +func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Capability, don registrysyncer.DON, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error)) error { capID := capability.ID info, err := capabilities.NewRemoteCapabilityInfo( capID, diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index c5b4e841858..e5a46c87914 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -2,10 +2,11 @@ package remote import ( "context" + "crypto/sha256" + "encoding/binary" "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -22,19 +23,22 @@ import ( // // TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes. type triggerPublisher struct { - config *capabilities.RemoteTriggerConfig - underlying commoncap.TriggerCapability - capInfo commoncap.CapabilityInfo - capDonInfo commoncap.DON - workflowDONs map[uint32]commoncap.DON - membersCache map[uint32]map[p2ptypes.PeerID]bool - dispatcher types.Dispatcher - messageCache *messageCache[registrationKey, p2ptypes.PeerID] - registrations map[registrationKey]*pubRegState - mu sync.RWMutex // protects messageCache and registrations - stopCh services.StopChan - wg sync.WaitGroup - lggr logger.Logger + config *commoncap.RemoteTriggerConfig + underlying commoncap.TriggerCapability + capInfo commoncap.CapabilityInfo + capDonInfo commoncap.DON + workflowDONs map[uint32]commoncap.DON + membersCache map[uint32]map[p2ptypes.PeerID]bool + dispatcher types.Dispatcher + messageCache *messageCache[registrationKey, p2ptypes.PeerID] + registrations map[registrationKey]*pubRegState + mu sync.RWMutex // protects messageCache and registrations + batchingQueue map[[32]byte]*batchedResponse + batchingEnabled bool + bqMu sync.Mutex // protects batchingQueue + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger } type registrationKey struct { @@ -47,13 +51,21 @@ type pubRegState struct { request commoncap.TriggerRegistrationRequest } -var _ types.Receiver = &triggerPublisher{} -var _ services.Service = &triggerPublisher{} +type batchedResponse struct { + rawResponse []byte + callerDonID uint32 + triggerEventID string + workflowIDs []string +} + +var _ types.ReceiverService = &triggerPublisher{} -func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { +const minAllowedBatchCollectionPeriod = 10 * time.Millisecond + +func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { if config == nil { lggr.Info("no config provided, using default values") - config = &capabilities.RemoteTriggerConfig{} + config = &commoncap.RemoteTriggerConfig{} } config.ApplyDefaults() membersCache := make(map[uint32]map[p2ptypes.PeerID]bool) @@ -65,23 +77,29 @@ func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying co membersCache[id] = cache } return &triggerPublisher{ - config: config, - underlying: underlying, - capInfo: capInfo, - capDonInfo: capDonInfo, - workflowDONs: workflowDONs, - membersCache: membersCache, - dispatcher: dispatcher, - messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](), - registrations: make(map[registrationKey]*pubRegState), - stopCh: make(services.StopChan), - lggr: lggr.Named("TriggerPublisher"), + config: config, + underlying: underlying, + capInfo: capInfo, + capDonInfo: capDonInfo, + workflowDONs: workflowDONs, + membersCache: membersCache, + dispatcher: dispatcher, + messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](), + registrations: make(map[registrationKey]*pubRegState), + batchingQueue: make(map[[32]byte]*batchedResponse), + batchingEnabled: config.MaxBatchSize > 1 && config.BatchCollectionPeriod >= minAllowedBatchCollectionPeriod, + stopCh: make(services.StopChan), + lggr: lggr.Named("TriggerPublisher"), } } func (p *triggerPublisher) Start(ctx context.Context) error { p.wg.Add(1) go p.registrationCleanupLoop() + if p.batchingEnabled { + p.wg.Add(1) + go p.batchingLoop() + } p.lggr.Info("TriggerPublisher started") return nil } @@ -202,31 +220,98 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR } triggerEvent := response.Event p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID) - marshaled, err := pb.MarshalTriggerResponse(response) + marshaledResponse, err := pb.MarshalTriggerResponse(response) if err != nil { p.lggr.Debugw("can't marshal trigger event", "err", err) break } - msg := &types.MessageBody{ - CapabilityId: p.capInfo.ID, - CapabilityDonId: p.capDonInfo.ID, - CallerDonId: key.callerDonId, - Method: types.MethodTriggerEvent, - Payload: marshaled, - Metadata: &types.MessageBody_TriggerEventMetadata{ - TriggerEventMetadata: &types.TriggerEventMetadata{ - // NOTE: optionally introduce batching across workflows as an optimization - WorkflowIds: []string{key.workflowId}, - TriggerEventId: triggerEvent.ID, - }, + + if p.batchingEnabled { + p.enqueueForBatching(marshaledResponse, key, triggerEvent.ID) + } else { + // a single-element "batch" + p.sendBatch(&batchedResponse{ + rawResponse: marshaledResponse, + callerDonID: key.callerDonId, + triggerEventID: triggerEvent.ID, + workflowIDs: []string{key.workflowId}, + }) + } + } + } +} + +func (p *triggerPublisher) enqueueForBatching(rawResponse []byte, key registrationKey, triggerEventID string) { + // put in batching queue, group by hash(callerDonId, triggerEventID, response) + combined := make([]byte, 4) + binary.LittleEndian.PutUint32(combined, key.callerDonId) + combined = append(combined, []byte(triggerEventID)...) + combined = append(combined, rawResponse...) + sha := sha256.Sum256(combined) + p.bqMu.Lock() + elem, exists := p.batchingQueue[sha] + if !exists { + elem = &batchedResponse{ + rawResponse: rawResponse, + callerDonID: key.callerDonId, + triggerEventID: triggerEventID, + workflowIDs: []string{key.workflowId}, + } + p.batchingQueue[sha] = elem + } else { + elem.workflowIDs = append(elem.workflowIDs, key.workflowId) + } + p.bqMu.Unlock() +} + +func (p *triggerPublisher) sendBatch(resp *batchedResponse) { + for len(resp.workflowIDs) > 0 { + idBatch := resp.workflowIDs + if p.batchingEnabled && int64(len(idBatch)) > int64(p.config.MaxBatchSize) { + idBatch = idBatch[:p.config.MaxBatchSize] + resp.workflowIDs = resp.workflowIDs[p.config.MaxBatchSize:] + } else { + resp.workflowIDs = nil + } + msg := &types.MessageBody{ + CapabilityId: p.capInfo.ID, + CapabilityDonId: p.capDonInfo.ID, + CallerDonId: resp.callerDonID, + Method: types.MethodTriggerEvent, + Payload: resp.rawResponse, + Metadata: &types.MessageBody_TriggerEventMetadata{ + TriggerEventMetadata: &types.TriggerEventMetadata{ + WorkflowIds: idBatch, + TriggerEventId: resp.triggerEventID, }, + }, + } + // NOTE: send to all nodes by default, introduce different strategies later (KS-76) + for _, peerID := range p.workflowDONs[resp.callerDonID].Members { + err := p.dispatcher.Send(peerID, msg) + if err != nil { + p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err) } - // NOTE: send to all nodes by default, introduce different strategies later (KS-76) - for _, peerID := range p.workflowDONs[key.callerDonId].Members { - err = p.dispatcher.Send(peerID, msg) - if err != nil { - p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err) - } + } + } +} + +func (p *triggerPublisher) batchingLoop() { + defer p.wg.Done() + ticker := time.NewTicker(p.config.BatchCollectionPeriod) + defer ticker.Stop() + for { + select { + case <-p.stopCh: + return + case <-ticker.C: + p.bqMu.Lock() + queue := p.batchingQueue + p.batchingQueue = make(map[[32]byte]*batchedResponse) + p.bqMu.Unlock() + + for _, elem := range queue { + p.sendBatch(elem) } } } diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index bcc79b4fbb9..8d078dc1aad 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -5,47 +5,130 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +const capID = "cap_id@1" + func TestTriggerPublisher_Register(t *testing.T) { + ctx := testutils.Context(t) + capabilityDONID, workflowDONID := uint32(1), uint32(2) + + underlyingTriggerCap, publisher, _, peers := newServices(t, capabilityDONID, workflowDONID, 1) + + // invalid sender case - node 0 is not a member of the workflow DON, registration shoudn't happen + regEvent := newRegisterTriggerMessage(t, workflowDONID, peers[0]) + publisher.Receive(ctx, regEvent) + require.Empty(t, underlyingTriggerCap.registrationsCh) + + // valid registration + regEvent = newRegisterTriggerMessage(t, workflowDONID, peers[1]) + publisher.Receive(ctx, regEvent) + require.NotEmpty(t, underlyingTriggerCap.registrationsCh) + forwarded := <-underlyingTriggerCap.registrationsCh + require.Equal(t, workflowID1, forwarded.Metadata.WorkflowID) + + require.NoError(t, publisher.Close()) +} + +func TestTriggerPublisher_ReceiveTriggerEvents_NoBatching(t *testing.T) { + ctx := testutils.Context(t) + capabilityDONID, workflowDONID := uint32(1), uint32(2) + + underlyingTriggerCap, publisher, dispatcher, peers := newServices(t, capabilityDONID, workflowDONID, 1) + regEvent := newRegisterTriggerMessage(t, workflowDONID, peers[1]) + publisher.Receive(ctx, regEvent) + require.NotEmpty(t, underlyingTriggerCap.registrationsCh) + + // send a trigger event and expect that it gets delivered right away + underlyingTriggerCap.eventCh <- commoncap.TriggerResponse{} + awaitOutgoingMessageCh := make(chan struct{}) + dispatcher.On("Send", peers[1], mock.Anything).Run(func(args mock.Arguments) { + awaitOutgoingMessageCh <- struct{}{} + }).Return(nil) + <-awaitOutgoingMessageCh + + require.NoError(t, publisher.Close()) +} + +func TestTriggerPublisher_ReceiveTriggerEvents_BatchingEnabled(t *testing.T) { + ctx := testutils.Context(t) + capabilityDONID, workflowDONID := uint32(1), uint32(2) + + underlyingTriggerCap, publisher, dispatcher, peers := newServices(t, capabilityDONID, workflowDONID, 2) + regEvent := newRegisterTriggerMessage(t, workflowDONID, peers[1]) + publisher.Receive(ctx, regEvent) + require.NotEmpty(t, underlyingTriggerCap.registrationsCh) + + // send two trigger events and expect them to be delivered in a batch + underlyingTriggerCap.eventCh <- commoncap.TriggerResponse{} + underlyingTriggerCap.eventCh <- commoncap.TriggerResponse{} + awaitOutgoingMessageCh := make(chan struct{}) + dispatcher.On("Send", peers[1], mock.Anything).Run(func(args mock.Arguments) { + msg := args.Get(1).(*remotetypes.MessageBody) + require.Equal(t, capID, msg.CapabilityId) + require.Equal(t, remotetypes.MethodTriggerEvent, msg.Method) + require.NotEmpty(t, msg.Payload) + metadata := msg.Metadata.(*remotetypes.MessageBody_TriggerEventMetadata) + require.Len(t, metadata.TriggerEventMetadata.WorkflowIds, 2) + awaitOutgoingMessageCh <- struct{}{} + }).Return(nil).Once() + <-awaitOutgoingMessageCh + + // if there are fewer pending event than the batch size, + // the events should still be sent after the batch collection period + underlyingTriggerCap.eventCh <- commoncap.TriggerResponse{} + dispatcher.On("Send", peers[1], mock.Anything).Run(func(args mock.Arguments) { + msg := args.Get(1).(*remotetypes.MessageBody) + metadata := msg.Metadata.(*remotetypes.MessageBody_TriggerEventMetadata) + require.Len(t, metadata.TriggerEventMetadata.WorkflowIds, 1) + awaitOutgoingMessageCh <- struct{}{} + }).Return(nil).Once() + <-awaitOutgoingMessageCh + + require.NoError(t, publisher.Close()) +} + +func newServices(t *testing.T, capabilityDONID uint32, workflowDONID uint32, maxBatchSize uint32) (*testTrigger, remotetypes.ReceiverService, *mocks.Dispatcher, []p2ptypes.PeerID) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) capInfo := commoncap.CapabilityInfo{ - ID: "cap_id@1", + ID: capID, CapabilityType: commoncap.CapabilityTypeTrigger, Description: "Remote Trigger", } - p1 := p2ptypes.PeerID{} - require.NoError(t, p1.UnmarshalText([]byte(peerID1))) - p2 := p2ptypes.PeerID{} - require.NoError(t, p2.UnmarshalText([]byte(peerID2))) + peers := make([]p2ptypes.PeerID, 2) + require.NoError(t, peers[0].UnmarshalText([]byte(peerID1))) + require.NoError(t, peers[1].UnmarshalText([]byte(peerID2))) capDonInfo := commoncap.DON{ - ID: 1, - Members: []p2ptypes.PeerID{p1}, + ID: capabilityDONID, + Members: []p2ptypes.PeerID{peers[0]}, // peer 0 is in the capability DON F: 0, } workflowDonInfo := commoncap.DON{ - ID: 2, - Members: []p2ptypes.PeerID{p2}, + ID: workflowDONID, + Members: []p2ptypes.PeerID{peers[1]}, // peer 1 is in the workflow DON F: 0, } - dispatcher := remoteMocks.NewDispatcher(t) + dispatcher := mocks.NewDispatcher(t) config := &commoncap.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, + MaxBatchSize: maxBatchSize, + BatchCollectionPeriod: time.Second, } workflowDONs := map[uint32]commoncap.DON{ workflowDonInfo.ID: workflowDonInfo, @@ -53,10 +136,14 @@ func TestTriggerPublisher_Register(t *testing.T) { underlying := &testTrigger{ info: capInfo, registrationsCh: make(chan commoncap.TriggerRegistrationRequest, 2), + eventCh: make(chan commoncap.TriggerResponse, 2), } publisher := remote.NewTriggerPublisher(config, underlying, capInfo, capDonInfo, workflowDONs, dispatcher, lggr) require.NoError(t, publisher.Start(ctx)) + return underlying, publisher, dispatcher, peers +} +func newRegisterTriggerMessage(t *testing.T, callerDonID uint32, sender p2ptypes.PeerID) *remotetypes.MessageBody { // trigger registration event triggerRequest := commoncap.TriggerRegistrationRequest{ Metadata: commoncap.RequestMetadata{ @@ -65,39 +152,29 @@ func TestTriggerPublisher_Register(t *testing.T) { } marshaled, err := pb.MarshalTriggerRegistrationRequest(triggerRequest) require.NoError(t, err) - regEvent := &remotetypes.MessageBody{ - Sender: p1[:], + return &remotetypes.MessageBody{ + Sender: sender[:], Method: remotetypes.MethodRegisterTrigger, - CallerDonId: workflowDonInfo.ID, + CallerDonId: callerDonID, Payload: marshaled, } - publisher.Receive(ctx, regEvent) - // node p1 is not a member of the workflow DON so registration shoudn't happen - require.Empty(t, underlying.registrationsCh) - - regEvent.Sender = p2[:] - publisher.Receive(ctx, regEvent) - require.NotEmpty(t, underlying.registrationsCh) - forwarded := <-underlying.registrationsCh - require.Equal(t, triggerRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID) - - require.NoError(t, publisher.Close()) } type testTrigger struct { info commoncap.CapabilityInfo registrationsCh chan commoncap.TriggerRegistrationRequest + eventCh chan commoncap.TriggerResponse } -func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) { - return t.info, nil +func (tr *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) { + return tr.info, nil } -func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) { - t.registrationsCh <- request - return nil, nil +func (tr *testTrigger) RegisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) { + tr.registrationsCh <- request + return tr.eventCh, nil } -func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) error { +func (tr *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) error { return nil } diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index 7f3868486a4..54ec16f09f1 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -30,6 +30,11 @@ type Receiver interface { Receive(ctx context.Context, msg *MessageBody) } +type ReceiverService interface { + services.Service + Receiver +} + type Aggregator interface { Aggregate(eventID string, responses [][]byte) (commoncap.TriggerResponse, error) }