diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 3fc321087b8..3182b192b74 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -28,16 +28,16 @@ import ( ) var defaultStreamConfig = p2ptypes.StreamConfig{ - IncomingMessageBufferSize: 1000000, - OutgoingMessageBufferSize: 1000000, - MaxMessageLenBytes: 100000, + IncomingMessageBufferSize: 500, + OutgoingMessageBufferSize: 500, + MaxMessageLenBytes: 500000, // 500 KB; max capacity = 500 * 500000 = 250 MB MessageRateLimiter: ragep2p.TokenBucketParams{ Rate: 100.0, - Capacity: 1000, + Capacity: 500, }, BytesRateLimiter: ragep2p.TokenBucketParams{ - Rate: 100000.0, - Capacity: 1000000, + Rate: 5000000.0, // 5 MB/s + Capacity: 10000000, // 10 MB }, } diff --git a/core/capabilities/remote/message_cache.go b/core/capabilities/remote/message_cache.go index 27f909c5165..f3a3a79b2c6 100644 --- a/core/capabilities/remote/message_cache.go +++ b/core/capabilities/remote/message_cache.go @@ -60,12 +60,12 @@ func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, if msg.timestamp >= minTimestamp { countAboveMinTimestamp++ accPayloads = append(accPayloads, msg.payload) - if countAboveMinTimestamp >= minCount { - ev.wasReady = true - return true, accPayloads - } } } + if countAboveMinTimestamp >= minCount { + ev.wasReady = true + return true, accPayloads + } return false, nil } diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index b4d749754d4..23b778f6018 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -163,7 +163,7 @@ func (p *triggerPublisher) registrationCleanupLoop() { return case <-ticker.C: now := time.Now().UnixMilli() - p.mu.RLock() + p.mu.Lock() for key, req := range p.registrations { callerDon := p.workflowDONs[key.callerDonId] ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false) @@ -178,7 +178,7 @@ func (p *triggerPublisher) registrationCleanupLoop() { p.messageCache.Delete(key) } } - p.mu.RUnlock() + p.mu.Unlock() } } } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index d957614886a..c932dc68e37 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -6,7 +6,6 @@ import ( sync "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -23,11 +22,11 @@ import ( // // TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes. type triggerSubscriber struct { - config *capabilities.RemoteTriggerConfig + config *commoncap.RemoteTriggerConfig capInfo commoncap.CapabilityInfo - capDonInfo capabilities.DON + capDonInfo commoncap.DON capDonMembers map[p2ptypes.PeerID]struct{} - localDonInfo capabilities.DON + localDonInfo commoncap.DON dispatcher types.Dispatcher aggregator types.Aggregator messageCache *messageCache[triggerEventKey, p2ptypes.PeerID] @@ -53,16 +52,19 @@ var _ types.Receiver = &triggerSubscriber{} var _ services.Service = &triggerSubscriber{} // TODO makes this configurable with a default -const defaultSendChannelBufferSize = 1000 +const ( + defaultSendChannelBufferSize = 1000 + maxBatchedWorkflowIDs = 1000 +) -func NewTriggerSubscriber(config *capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { +func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { if aggregator == nil { lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) } if config == nil { lggr.Info("no config provided, using default values") - config = &capabilities.RemoteTriggerConfig{} + config = &commoncap.RemoteTriggerConfig{} } config.ApplyDefaults() capDonMembers := make(map[p2ptypes.PeerID]struct{}) @@ -184,6 +186,10 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { s.lggr.Errorw("received message with invalid trigger metadata", "capabilityId", s.capInfo.ID, "sender", sender) return } + if len(meta.WorkflowIds) > maxBatchedWorkflowIDs { + s.lggr.Errorw("received message with too many workflow IDs - truncating", "capabilityId", s.capInfo.ID, "nWorkflows", len(meta.WorkflowIds), "sender", sender) + meta.WorkflowIds = meta.WorkflowIds[:maxBatchedWorkflowIDs] + } for _, workflowId := range meta.WorkflowIds { s.mu.RLock() registration, found := s.registeredWorkflows[workflowId] @@ -197,10 +203,10 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { workflowId: workflowId, } nowMs := time.Now().UnixMilli() - s.mu.RLock() + s.mu.Lock() creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload) ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true) - s.mu.RUnlock() + s.mu.Unlock() if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() { s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "sender", sender) continue diff --git a/core/capabilities/remote/utils.go b/core/capabilities/remote/utils.go index 10e4e3082c9..7e303eefc8f 100644 --- a/core/capabilities/remote/utils.go +++ b/core/capabilities/remote/utils.go @@ -92,7 +92,8 @@ func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, hashToCount[sha]++ if hashToCount[sha] >= minIdenticalResponses { found = elem - break + // update in case we find another elem with an even higher count + minIdenticalResponses = hashToCount[sha] } } if found == nil {