Skip to content

Commit

Permalink
Fix BCFR-1075 Tx Sender go routines leak (#15425)
Browse files Browse the repository at this point in the history
* Do not try to send tx result into collector if SendTransaction was already done

* improve comment

* changeset

* fix changeset
  • Loading branch information
dhaidashenko authored Dec 6, 2024
1 parent 4b738ab commit f094f6c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-doors-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix TransactionSender go routine leak. #bugfix
37 changes: 27 additions & 10 deletions common/client/transaction_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendT
// * Otherwise, returns any (effectively random) of the errors.
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT {
var result RESULT
ctx, cancel := txSender.chStop.Ctx(ctx)
defer cancel()
if !txSender.IfStarted(func() {
txResults := make(chan RESULT)
txResultsToReport := make(chan RESULT)
Expand All @@ -103,8 +105,6 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
if isSendOnly {
txSender.wg.Add(1)
go func(ctx context.Context) {
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
defer txSender.wg.Done()
// Send-only nodes' results are ignored as they tend to return false-positive responses.
// Broadcast to them is necessary to speed up the propagation of TX in the network.
Expand All @@ -117,8 +117,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
healthyNodesNum++
primaryNodeWg.Add(1)
go func(ctx context.Context) {
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
// Broadcasting transaction and results reporting for invariant detection are background jobs that must be detached from
// callers cancellation.
// Results reporting to SendTransaction caller must respect caller's context to avoid goroutine leak.
defer primaryNodeWg.Done()
r := txSender.broadcastTxAsync(ctx, rpc, tx)
select {
Expand All @@ -128,6 +129,8 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
case txResults <- r:
}

ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
select {
case <-ctx.Done():
txSender.lggr.Debugw("Failed to send tx results to report", "err", ctx.Err())
Expand All @@ -151,8 +154,13 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
return
}

if healthyNodesNum == 0 {
result = txSender.newResult(ErroringNodeError)
return
}

txSender.wg.Add(1)
go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport)
go txSender.reportSendTxAnomalies(tx, txResultsToReport)

result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults)
}) {
Expand All @@ -163,6 +171,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT {
// broadcast is a background job, so always detach from caller's cancellation
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
result := rpc.SendTransaction(ctx, tx)
txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error())
if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil {
Expand All @@ -171,16 +182,25 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(c
return result
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) {
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) {
defer txSender.wg.Done()
resultsByCode := sendTxResults[RESULT]{}
// txResults eventually will be closed
for txResult := range txResults {
resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult)
}

select {
case <-txSender.chStop:
// it's ok to receive no results if txSender is closing. Return early to prevent false reporting of invariant violation.
if len(resultsByCode) == 0 {
return
}
default:
}

_, criticalErr := aggregateTxResults[RESULT](resultsByCode)
if criticalErr != nil && ctx.Err() == nil {
if criticalErr != nil {
txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc()
}
Expand Down Expand Up @@ -218,9 +238,6 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT {
if healthyNodesNum == 0 {
return txSender.newResult(ErroringNodeError)
}
requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum))
errorsByCode := sendTxResults[RESULT]{}
var softTimeoutChan <-chan time.Time
Expand Down
22 changes: 22 additions & 0 deletions common/client/transaction_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand Down Expand Up @@ -293,6 +294,27 @@ func TestTransactionSender_SendTransaction(t *testing.T) {
require.NoError(t, result.Error())
require.Equal(t, Successful, result.Code())
})
t.Run("All background jobs stop even if RPC returns result after soft timeout", func(t *testing.T) {
chainID := types.RandomID()
expectedError := errors.New("transaction failed")
fastNode := newNode(t, expectedError, nil)

// hold reply from the node till SendTransaction returns result
sendTxContext, sendTxCancel := context.WithCancel(tests.Context(t))
slowNode := newNode(t, errors.New("transaction failed"), func(_ mock.Arguments) {
<-sendTxContext.Done()
})

lggr := logger.Test(t)

_, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, nil)
result := txSender.SendTransaction(sendTxContext, nil)
sendTxCancel()
require.EqualError(t, result.Error(), expectedError.Error())
// TxSender should stop all background go routines after SendTransaction is done and before test is done.
// Otherwise, it signals that we have a goroutine leak.
txSender.wg.Wait()
})
}

func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) {
Expand Down

0 comments on commit f094f6c

Please sign in to comment.