Skip to content

Commit

Permalink
Increase transmitter concurrency
Browse files Browse the repository at this point in the history
MERC-6635
  • Loading branch information
samsondav committed Nov 18, 2024
1 parent 4f8e15f commit 9ae6c3f
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ import (
const (
// Mercury server error codes
DuplicateReport = 2

// TransmitConcurrency is the number of concurrent transmits to each
// server. Number of goroutines per server will be roughly 2x this because
// each server has a delete queue and a transmit queue.
//
// This could be reduced by implementing transmit batching, see: https://smartcontract-it.atlassian.net/browse/MERC-6635
TransmitConcurrency = 100
)

var (
Expand Down Expand Up @@ -167,9 +174,12 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
// starting pm after loading from it is fine because it simply spawns some garbage collection/prune goroutines
startClosers = append(startClosers, s.c, s.q, s.pm)

mt.wg.Add(2)
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, fmt.Sprintf("%d", mt.donID))
nThreads := int(TransmitConcurrency)
mt.wg.Add(2 * nThreads)
for i := 0; i < nThreads; i++ {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, fmt.Sprintf("%d", mt.donID))

Check failure on line 181 in core/services/llo/mercurytransmitter/transmitter.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with faster strconv.FormatUint (perfsprint)
}
}
if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil {
return err
Expand Down

0 comments on commit 9ae6c3f

Please sign in to comment.