Skip to content

Commit

Permalink
rpc,eth: output more info about failed txs(#2041)
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 authored Dec 7, 2023
1 parent fa5d0cf commit e3ef62f
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 25 deletions.
12 changes: 6 additions & 6 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails

// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*txpool.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func(string, []*txpool.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer

step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
Expand All @@ -181,14 +181,14 @@ type TxFetcher struct {

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
Expand Down Expand Up @@ -300,7 +300,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
for j, tx := range batch {
wrapped[j] = &txpool.Transaction{Tx: tx}
}
for j, err := range f.addTxs(wrapped) {
for j, err := range f.addTxs(peer, wrapped) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
Expand Down
30 changes: 15 additions & 15 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -539,7 +539,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -865,7 +865,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%2 == 0 {
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
Expand All @@ -964,7 +964,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -1083,7 +1083,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand All @@ -1155,7 +1155,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -1184,7 +1184,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down Expand Up @@ -1217,7 +1217,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error {
Expand Down
21 changes: 18 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/monitor"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/fetcher"
Expand Down Expand Up @@ -65,7 +66,8 @@ const (
)

var (
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
accountBlacklistPeerCounter = metrics.NewRegisteredCounter("eth/count/blacklist", nil)
)

// txPool defines the methods needed from a transaction pool implementation to
Expand Down Expand Up @@ -342,8 +344,21 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return p.RequestTxs(hashes)
}
addTxs := func(txs []*txpool.Transaction) []error {
return h.txpool.Add(txs, false, false)
addTxs := func(peer string, txs []*txpool.Transaction) []error {
errors := h.txpool.Add(txs, false, false)
for _, err := range errors {
if err == legacypool.ErrInBlackList {
accountBlacklistPeerCounter.Inc(1)
p := h.peers.peer(peer)
if p != nil {
remoteAddr := p.remoteAddr()
if remoteAddr != nil {
log.Warn("blacklist account detected from other peer", "remoteAddr", remoteAddr, "ID", p.ID())
}
}
}
}
return errors
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx)
h.chainSync = newChainSyncer(h)
Expand Down
9 changes: 9 additions & 0 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package eth

import (
"net"

"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/eth/protocols/trust"

Expand Down Expand Up @@ -45,6 +47,13 @@ func (p *ethPeer) info() *ethPeerInfo {
}
}

func (p *ethPeer) remoteAddr() net.Addr {
if p.Peer != nil && p.Peer.Peer != nil {
return p.Peer.Peer.RemoteAddr()
}
return nil
}

// snapPeerInfo represents a short summary of the `snap` sub-protocol metadata known
// about a connected peer.
type snapPeerInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ func (p *Peer) RemoteAddr() net.Addr {
}
log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr)
}
if p.rw == nil {
return nil
}
return p.rw.fd.RemoteAddr()
}

Expand Down
10 changes: 10 additions & 0 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/metrics"

"github.com/ethereum/go-ethereum/log"
)

var (
accountBlacklistRpcCounter = metrics.NewRegisteredCounter("rpc/count/blacklist", nil)
)

// handler handles JSON-RPC messages. There is one handler per connection. Note that
// handler is not safe for concurrent use. Message handling never blocks indefinitely
// because RPCs are processed on background goroutines launched by handler.
Expand Down Expand Up @@ -476,6 +481,11 @@ func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *json
xForward := reqCtx.Value("X-Forwarded-For")
h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward)

monitoredError := "sender or to in black list" // using legacypool.ErrInBlackList.Error() will cause `import cycle`
if strings.Contains(resp.Error.Message, monitoredError) {
accountBlacklistRpcCounter.Inc(1)
log.Warn("blacklist account detected from direct rpc", "remoteAddr", h.conn.remoteAddr())
}
ctx = append(ctx, "err", resp.Error.Message)
if resp.Error.Data != nil {
ctx = append(ctx, "errdata", resp.Error.Data)
Expand Down
2 changes: 1 addition & 1 deletion tests/fuzzers/txfetcher/txfetcher_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Fuzz(input []byte) int {

f := fetcher.NewTxFetcherForTests(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
Expand Down

0 comments on commit e3ef62f

Please sign in to comment.