Skip to content

Commit

Permalink
[Keystone] Minor fixes (#14091)
Browse files Browse the repository at this point in the history
A bundle of minor fixes in several components:
1. Launcher: adjust defaultStreamConfig values to a resonable max capacity (250 MB per channel)
2. Trigger shims: change some incorrect read-locks to rw-locks
3. Trigger shims: enforce max possible number of bundled worklflow IDs in an event
4. Mode Aggregator: avoid unnecessary early exit
5. Message Cache: avoid unnecessary early exit while collecting ready messages
  • Loading branch information
bolekk authored Aug 12, 2024
1 parent c330def commit 1fcfd80
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
12 changes: 6 additions & 6 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ import (
)

var defaultStreamConfig = p2ptypes.StreamConfig{
IncomingMessageBufferSize: 1000000,
OutgoingMessageBufferSize: 1000000,
MaxMessageLenBytes: 100000,
IncomingMessageBufferSize: 500,
OutgoingMessageBufferSize: 500,
MaxMessageLenBytes: 500000, // 500 KB; max capacity = 500 * 500000 = 250 MB
MessageRateLimiter: ragep2p.TokenBucketParams{
Rate: 100.0,
Capacity: 1000,
Capacity: 500,
},
BytesRateLimiter: ragep2p.TokenBucketParams{
Rate: 100000.0,
Capacity: 1000000,
Rate: 5000000.0, // 5 MB/s
Capacity: 10000000, // 10 MB
},
}

Expand Down
8 changes: 4 additions & 4 deletions core/capabilities/remote/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32,
if msg.timestamp >= minTimestamp {
countAboveMinTimestamp++
accPayloads = append(accPayloads, msg.payload)
if countAboveMinTimestamp >= minCount {
ev.wasReady = true
return true, accPayloads
}
}
}
if countAboveMinTimestamp >= minCount {
ev.wasReady = true
return true, accPayloads
}
return false, nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
return
case <-ticker.C:
now := time.Now().UnixMilli()
p.mu.RLock()
p.mu.Lock()
for key, req := range p.registrations {
callerDon := p.workflowDONs[key.callerDonId]
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false)
Expand All @@ -178,7 +178,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
p.messageCache.Delete(key)
}
}
p.mu.RUnlock()
p.mu.Unlock()
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
sync "sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -23,11 +22,11 @@ import (
//
// TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes.
type triggerSubscriber struct {
config *capabilities.RemoteTriggerConfig
config *commoncap.RemoteTriggerConfig
capInfo commoncap.CapabilityInfo
capDonInfo capabilities.DON
capDonInfo commoncap.DON
capDonMembers map[p2ptypes.PeerID]struct{}
localDonInfo capabilities.DON
localDonInfo commoncap.DON
dispatcher types.Dispatcher
aggregator types.Aggregator
messageCache *messageCache[triggerEventKey, p2ptypes.PeerID]
Expand All @@ -53,16 +52,19 @@ var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}

// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000
const (
defaultSendChannelBufferSize = 1000
maxBatchedWorkflowIDs = 1000
)

func NewTriggerSubscriber(config *capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
}
if config == nil {
lggr.Info("no config provided, using default values")
config = &capabilities.RemoteTriggerConfig{}
config = &commoncap.RemoteTriggerConfig{}
}
config.ApplyDefaults()
capDonMembers := make(map[p2ptypes.PeerID]struct{})
Expand Down Expand Up @@ -184,6 +186,10 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
s.lggr.Errorw("received message with invalid trigger metadata", "capabilityId", s.capInfo.ID, "sender", sender)
return
}
if len(meta.WorkflowIds) > maxBatchedWorkflowIDs {
s.lggr.Errorw("received message with too many workflow IDs - truncating", "capabilityId", s.capInfo.ID, "nWorkflows", len(meta.WorkflowIds), "sender", sender)
meta.WorkflowIds = meta.WorkflowIds[:maxBatchedWorkflowIDs]
}
for _, workflowId := range meta.WorkflowIds {
s.mu.RLock()
registration, found := s.registeredWorkflows[workflowId]
Expand All @@ -197,10 +203,10 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
workflowId: workflowId,
}
nowMs := time.Now().UnixMilli()
s.mu.RLock()
s.mu.Lock()
creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload)
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true)
s.mu.RUnlock()
s.mu.Unlock()
if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() {
s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "sender", sender)
continue
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte,
hashToCount[sha]++
if hashToCount[sha] >= minIdenticalResponses {
found = elem
break
// update in case we find another elem with an even higher count
minIdenticalResponses = hashToCount[sha]
}
}
if found == nil {
Expand Down

0 comments on commit 1fcfd80

Please sign in to comment.