From 2dd28d2becf000b63433b45f55f109b6f7744c8b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 09:41:49 +0700 Subject: [PATCH 1/5] save --- gointerfaces/downloader/downloader.pb.go | 4 +- gointerfaces/downloader/downloader_grpc.pb.go | 2 +- gointerfaces/execution/execution.pb.go | 4 +- gointerfaces/execution/execution_grpc.pb.go | 2 +- gointerfaces/remote/ethbackend.pb.go | 4 +- gointerfaces/remote/ethbackend_grpc.pb.go | 2 +- gointerfaces/remote/kv.pb.go | 4 +- gointerfaces/remote/kv_grpc.pb.go | 2 +- gointerfaces/remote/mocks.go | 28 +++++----- gointerfaces/sentinel/sentinel.pb.go | 4 +- gointerfaces/sentinel/sentinel_grpc.pb.go | 2 +- gointerfaces/sentry/sentry.pb.go | 4 +- gointerfaces/sentry/sentry_grpc.pb.go | 2 +- gointerfaces/txpool/mining.pb.go | 4 +- gointerfaces/txpool/mining_grpc.pb.go | 2 +- gointerfaces/txpool/txpool.pb.go | 4 +- gointerfaces/txpool/txpool_grpc.pb.go | 2 +- gointerfaces/types/types.pb.go | 4 +- txpool/fetch.go | 31 ++++------- txpool/mocks_test.go | 54 +++++++++++++++++++ txpool/pool.go | 49 +++++++++++++---- 21 files changed, 142 insertions(+), 72 deletions(-) diff --git a/gointerfaces/downloader/downloader.pb.go b/gointerfaces/downloader/downloader.pb.go index 8cef15ac6..773282e31 100644 --- a/gointerfaces/downloader/downloader.pb.go +++ b/gointerfaces/downloader/downloader.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/downloader/downloader_grpc.pb.go b/gointerfaces/downloader/downloader_grpc.pb.go index 8a6a60a7d..831743bbc 100644 --- a/gointerfaces/downloader/downloader_grpc.pb.go +++ b/gointerfaces/downloader/downloader_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/execution/execution.pb.go b/gointerfaces/execution/execution.pb.go index 60dcef584..5c2effd51 100644 --- a/gointerfaces/execution/execution.pb.go +++ b/gointerfaces/execution/execution.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/execution/execution_grpc.pb.go b/gointerfaces/execution/execution_grpc.pb.go index 9faac8573..b3779a0b1 100644 --- a/gointerfaces/execution/execution_grpc.pb.go +++ b/gointerfaces/execution/execution_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/remote/ethbackend.pb.go b/gointerfaces/remote/ethbackend.pb.go index ac0a099c6..118a3f763 100644 --- a/gointerfaces/remote/ethbackend.pb.go +++ b/gointerfaces/remote/ethbackend.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/ethbackend_grpc.pb.go b/gointerfaces/remote/ethbackend_grpc.pb.go index 8e986e082..4a410a32b 100644 --- a/gointerfaces/remote/ethbackend_grpc.pb.go +++ b/gointerfaces/remote/ethbackend_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/kv.pb.go b/gointerfaces/remote/kv.pb.go index 6dd2f965e..b5ac8e64a 100644 --- a/gointerfaces/remote/kv.pb.go +++ b/gointerfaces/remote/kv.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/kv_grpc.pb.go b/gointerfaces/remote/kv_grpc.pb.go index eb32cbf39..d0305cb0f 100644 --- a/gointerfaces/remote/kv_grpc.pb.go +++ b/gointerfaces/remote/kv_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/mocks.go b/gointerfaces/remote/mocks.go index 24e98be04..8300eb434 100644 --- a/gointerfaces/remote/mocks.go +++ b/gointerfaces/remote/mocks.go @@ -650,10 +650,10 @@ var _ KV_StateChangesClient = &KV_StateChangesClientMock{} // RecvFunc: func() (*StateChangeBatch, error) { // panic("mock out the Recv method") // }, -// RecvMsgFunc: func(m interface{}) error { +// RecvMsgFunc: func(m any) error { // panic("mock out the RecvMsg method") // }, -// SendMsgFunc: func(m interface{}) error { +// SendMsgFunc: func(m any) error { // panic("mock out the SendMsg method") // }, // TrailerFunc: func() metadata.MD { @@ -679,10 +679,10 @@ type KV_StateChangesClientMock struct { RecvFunc func() (*StateChangeBatch, error) // RecvMsgFunc mocks the RecvMsg method. - RecvMsgFunc func(m interface{}) error + RecvMsgFunc func(m any) error // SendMsgFunc mocks the SendMsg method. - SendMsgFunc func(m interface{}) error + SendMsgFunc func(m any) error // TrailerFunc mocks the Trailer method. TrailerFunc func() metadata.MD @@ -704,12 +704,12 @@ type KV_StateChangesClientMock struct { // RecvMsg holds details about calls to the RecvMsg method. RecvMsg []struct { // M is the m argument value. - M interface{} + M any } // SendMsg holds details about calls to the SendMsg method. SendMsg []struct { // M is the m argument value. - M interface{} + M any } // Trailer holds details about calls to the Trailer method. Trailer []struct { @@ -847,9 +847,9 @@ func (mock *KV_StateChangesClientMock) RecvCalls() []struct { } // RecvMsg calls RecvMsgFunc. -func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) RecvMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -870,10 +870,10 @@ func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.RecvMsgCalls()) func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockRecvMsg.RLock() calls = mock.calls.RecvMsg @@ -882,9 +882,9 @@ func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { } // SendMsg calls SendMsgFunc. -func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) SendMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -905,10 +905,10 @@ func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.SendMsgCalls()) func (mock *KV_StateChangesClientMock) SendMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockSendMsg.RLock() calls = mock.calls.SendMsg diff --git a/gointerfaces/sentinel/sentinel.pb.go b/gointerfaces/sentinel/sentinel.pb.go index 0e8be2e06..608597e7f 100644 --- a/gointerfaces/sentinel/sentinel.pb.go +++ b/gointerfaces/sentinel/sentinel.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentinel/sentinel_grpc.pb.go b/gointerfaces/sentinel/sentinel_grpc.pb.go index 13052e192..a62786b60 100644 --- a/gointerfaces/sentinel/sentinel_grpc.pb.go +++ b/gointerfaces/sentinel/sentinel_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentry/sentry.pb.go b/gointerfaces/sentry/sentry.pb.go index 0e43453fd..87710f442 100644 --- a/gointerfaces/sentry/sentry.pb.go +++ b/gointerfaces/sentry/sentry.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/sentry/sentry_grpc.pb.go b/gointerfaces/sentry/sentry_grpc.pb.go index 7802cf4fd..1a9d1959b 100644 --- a/gointerfaces/sentry/sentry_grpc.pb.go +++ b/gointerfaces/sentry/sentry_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/txpool/mining.pb.go b/gointerfaces/txpool/mining.pb.go index deacde3e6..20b3e0bd7 100644 --- a/gointerfaces/txpool/mining.pb.go +++ b/gointerfaces/txpool/mining.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/mining_grpc.pb.go b/gointerfaces/txpool/mining_grpc.pb.go index c2054b4e1..d0465eb5f 100644 --- a/gointerfaces/txpool/mining_grpc.pb.go +++ b/gointerfaces/txpool/mining_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/txpool.pb.go b/gointerfaces/txpool/txpool.pb.go index 65b061e9a..52b9b02de 100644 --- a/gointerfaces/txpool/txpool.pb.go +++ b/gointerfaces/txpool/txpool.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/txpool/txpool_grpc.pb.go b/gointerfaces/txpool/txpool_grpc.pb.go index a1ae12fc0..d8c6da0d0 100644 --- a/gointerfaces/txpool/txpool_grpc.pb.go +++ b/gointerfaces/txpool/txpool_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/types/types.pb.go b/gointerfaces/types/types.pb.go index 088bbfb73..adae72de7 100644 --- a/gointerfaces/types/types.pb.go +++ b/gointerfaces/types/types.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: types/types.proto package types diff --git a/txpool/fetch.go b/txpool/fetch.go index a9b24a6db..5ffb33fff 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -222,20 +222,15 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) } - var hashbuf [32]byte - var unknownHashes types2.Hashes - for i := 0; i < hashCount; i++ { - _, pos, err = types2.ParseHash(req.Data, pos, hashbuf[:0]) - if err != nil { - return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) - } - known, err := f.pool.IdHashKnown(tx, hashbuf[:]) - if err != nil { + hashes := make([]byte, 32*hashCount) + for i := 0; i < len(hashes); i += 32 { + if _, pos, err = types2.ParseHash(req.Data, pos, hashes[i:]); err != nil { return err } - if !known { - unknownHashes = append(unknownHashes, hashbuf[:]...) - } + } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { var encodedRequest []byte @@ -256,15 +251,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes88: %w", err) } - var unknownHashes types2.Hashes - for i := 0; i < len(hashes); i += 32 { - known, err := f.pool.IdHashKnown(tx, hashes[i:i+32]) - if err != nil { - return err - } - if !known { - unknownHashes = append(unknownHashes, hashes[i:i+32]...) - } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 78140c64c..555b3fc08 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -31,6 +31,9 @@ var _ Pool = &PoolMock{} // AddRemoteTxsFunc: func(ctx context.Context, newTxs types2.TxSlots) { // panic("mock out the AddRemoteTxs method") // }, +// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { +// panic("mock out the FilterKnownIdHashes method") +// }, // GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) *metaTx { // panic("mock out the GetKnownBlobTxn method") // }, @@ -65,6 +68,9 @@ type PoolMock struct { // AddRemoteTxsFunc mocks the AddRemoteTxs method. AddRemoteTxsFunc func(ctx context.Context, newTxs types2.TxSlots) + // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. + FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) + // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) *metaTx @@ -106,6 +112,13 @@ type PoolMock struct { // NewTxs is the newTxs argument value. NewTxs types2.TxSlots } + // FilterKnownIdHashes holds details about calls to the FilterKnownIdHashes method. + FilterKnownIdHashes []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hashes is the hashes argument value. + Hashes types2.Hashes + } // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. GetKnownBlobTxn []struct { // Tx is the tx argument value. @@ -152,6 +165,7 @@ type PoolMock struct { lockAddLocalTxs sync.RWMutex lockAddNewGoodPeer sync.RWMutex lockAddRemoteTxs sync.RWMutex + lockFilterKnownIdHashes sync.RWMutex lockGetKnownBlobTxn sync.RWMutex lockGetRlp sync.RWMutex lockIdHashKnown sync.RWMutex @@ -272,6 +286,46 @@ func (mock *PoolMock) AddRemoteTxsCalls() []struct { return calls } +// FilterKnownIdHashes calls FilterKnownIdHashesFunc. +func (mock *PoolMock) FilterKnownIdHashes(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { + callInfo := struct { + Tx kv.Tx + Hashes types2.Hashes + }{ + Tx: tx, + Hashes: hashes, + } + mock.lockFilterKnownIdHashes.Lock() + mock.calls.FilterKnownIdHashes = append(mock.calls.FilterKnownIdHashes, callInfo) + mock.lockFilterKnownIdHashes.Unlock() + if mock.FilterKnownIdHashesFunc == nil { + var ( + unknownHashesOut types2.Hashes + errOut error + ) + return unknownHashesOut, errOut + } + return mock.FilterKnownIdHashesFunc(tx, hashes) +} + +// FilterKnownIdHashesCalls gets all the calls that were made to FilterKnownIdHashes. +// Check the length with: +// +// len(mockedPool.FilterKnownIdHashesCalls()) +func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { + Tx kv.Tx + Hashes types2.Hashes +} { + var calls []struct { + Tx kv.Tx + Hashes types2.Hashes + } + mock.lockFilterKnownIdHashes.RLock() + calls = mock.calls.FilterKnownIdHashes + mock.lockFilterKnownIdHashes.RUnlock() + return calls +} + // GetKnownBlobTxn calls GetKnownBlobTxnFunc. func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { callInfo := struct { diff --git a/txpool/pool.go b/txpool/pool.go index fe59e96ff..31e785454 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -86,6 +86,7 @@ type Pool interface { OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) + FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx @@ -517,34 +518,60 @@ func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []b types, sizes, hashes = p.AppendRemoteAnnouncements(types, sizes, hashes) return types, sizes, hashes } -func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { - p.lock.Lock() - defer p.lock.Unlock() - if _, ok := p.discardReasonsLRU.Get(string(hash)); ok { +func (p *TxPool) idHashKnown(tx kv.Tx, hash []byte, hashS string) (bool, error) { + if _, ok := p.unprocessedRemoteByHash[hashS]; ok { return true, nil } - if _, ok := p.unprocessedRemoteByHash[string(hash)]; ok { + if _, ok := p.discardReasonsLRU.Get(hashS); ok { return true, nil } - if _, ok := p.byHash[string(hash)]; ok { + if _, ok := p.byHash[hashS]; ok { return true, nil } - if _, ok := p.minedBlobTxsByHash[string(hash)]; ok { + if _, ok := p.minedBlobTxsByHash[hashS]; ok { return true, nil } return tx.Has(kv.PoolTransaction, hash) } +func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { + hashS := string(hash) + p.lock.Lock() + defer p.lock.Unlock() + return p.idHashKnown(tx, hash, hashS) +} +func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) { + p.lock.Lock() + defer p.lock.Unlock() + for i := 0; i < len(hashes); i += 32 { + known, err := p.idHashKnown(tx, hashes[i:i+32], string(hashes[i:i+32])) + if err != nil { + return unknownHashes, err + } + if !known { + unknownHashes = append(unknownHashes, hashes[i:i+32]...) + } + } + return unknownHashes, err +} + +func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) { + if i, ok := p.unprocessedRemoteByHash[hashS]; ok { + return p.unprocessedRemoteTxs.Txs[i], true + } + return nil, false +} func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { + hashS := string(hash) p.lock.Lock() defer p.lock.Unlock() - if mt, ok := p.minedBlobTxsByHash[string(hash)]; ok { + if mt, ok := p.minedBlobTxsByHash[hashS]; ok { return mt } - if i, ok := p.unprocessedRemoteByHash[string(hash)]; ok { - return newMetaTx(p.unprocessedRemoteTxs.Txs[i], false, 0) + if txn, ok := p.getUnprocessedTxn(hashS); ok { + return newMetaTx(txn, false, 0) } - if mt, ok := p.byHash[string(hash)]; ok { + if mt, ok := p.byHash[hashS]; ok { return mt } if has, _ := tx.Has(kv.PoolTransaction, hash); has { From 713fdd95d87cf3467505938844f6c9953bfb56bb Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 10:01:24 +0700 Subject: [PATCH 2/5] save --- txpool/fetch.go | 5 +- txpool/mocks_test.go | 562 ------------------------------------------- txpool/pool.go | 30 ++- 3 files changed, 21 insertions(+), 576 deletions(-) delete mode 100644 txpool/mocks_test.go diff --git a/txpool/fetch.go b/txpool/fetch.go index 5ffb33fff..ecc9797b7 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -471,7 +471,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error { _, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil) if unwindTxs.Txs[i].Type == types2.BlobTxType { - knownBlobTxn := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + if err != nil { + return err + } if knownBlobTxn != nil { unwindTxs.Txs[i] = knownBlobTxn.Tx } diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go deleted file mode 100644 index 555b3fc08..000000000 --- a/txpool/mocks_test.go +++ /dev/null @@ -1,562 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package txpool - -import ( - "context" - "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg" - types2 "github.com/ledgerwatch/erigon-lib/types" - "sync" -) - -// Ensure, that PoolMock does implement Pool. -// If this is not the case, regenerate this file with moq. -var _ Pool = &PoolMock{} - -// PoolMock is a mock implementation of Pool. -// -// func TestSomethingThatUsesPool(t *testing.T) { -// -// // make and configure a mocked Pool -// mockedPool := &PoolMock{ -// AddLocalTxsFunc: func(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { -// panic("mock out the AddLocalTxs method") -// }, -// AddNewGoodPeerFunc: func(peerID types2.PeerID) { -// panic("mock out the AddNewGoodPeer method") -// }, -// AddRemoteTxsFunc: func(ctx context.Context, newTxs types2.TxSlots) { -// panic("mock out the AddRemoteTxs method") -// }, -// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { -// panic("mock out the FilterKnownIdHashes method") -// }, -// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) *metaTx { -// panic("mock out the GetKnownBlobTxn method") -// }, -// GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) { -// panic("mock out the GetRlp method") -// }, -// IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) { -// panic("mock out the IdHashKnown method") -// }, -// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { -// panic("mock out the OnNewBlock method") -// }, -// StartedFunc: func() bool { -// panic("mock out the Started method") -// }, -// ValidateSerializedTxnFunc: func(serializedTxn []byte) error { -// panic("mock out the ValidateSerializedTxn method") -// }, -// } -// -// // use mockedPool in code that requires Pool -// // and then make assertions. -// -// } -type PoolMock struct { - // AddLocalTxsFunc mocks the AddLocalTxs method. - AddLocalTxsFunc func(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) - - // AddNewGoodPeerFunc mocks the AddNewGoodPeer method. - AddNewGoodPeerFunc func(peerID types2.PeerID) - - // AddRemoteTxsFunc mocks the AddRemoteTxs method. - AddRemoteTxsFunc func(ctx context.Context, newTxs types2.TxSlots) - - // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. - FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) - - // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. - GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) *metaTx - - // GetRlpFunc mocks the GetRlp method. - GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error) - - // IdHashKnownFunc mocks the IdHashKnown method. - IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error) - - // OnNewBlockFunc mocks the OnNewBlock method. - OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error - - // StartedFunc mocks the Started method. - StartedFunc func() bool - - // ValidateSerializedTxnFunc mocks the ValidateSerializedTxn method. - ValidateSerializedTxnFunc func(serializedTxn []byte) error - - // calls tracks calls to the methods. - calls struct { - // AddLocalTxs holds details about calls to the AddLocalTxs method. - AddLocalTxs []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // NewTxs is the newTxs argument value. - NewTxs types2.TxSlots - // Tx is the tx argument value. - Tx kv.Tx - } - // AddNewGoodPeer holds details about calls to the AddNewGoodPeer method. - AddNewGoodPeer []struct { - // PeerID is the peerID argument value. - PeerID types2.PeerID - } - // AddRemoteTxs holds details about calls to the AddRemoteTxs method. - AddRemoteTxs []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // NewTxs is the newTxs argument value. - NewTxs types2.TxSlots - } - // FilterKnownIdHashes holds details about calls to the FilterKnownIdHashes method. - FilterKnownIdHashes []struct { - // Tx is the tx argument value. - Tx kv.Tx - // Hashes is the hashes argument value. - Hashes types2.Hashes - } - // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. - GetKnownBlobTxn []struct { - // Tx is the tx argument value. - Tx kv.Tx - // Hash is the hash argument value. - Hash []byte - } - // GetRlp holds details about calls to the GetRlp method. - GetRlp []struct { - // Tx is the tx argument value. - Tx kv.Tx - // Hash is the hash argument value. - Hash []byte - } - // IdHashKnown holds details about calls to the IdHashKnown method. - IdHashKnown []struct { - // Tx is the tx argument value. - Tx kv.Tx - // Hash is the hash argument value. - Hash []byte - } - // OnNewBlock holds details about calls to the OnNewBlock method. - OnNewBlock []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // StateChanges is the stateChanges argument value. - StateChanges *remote.StateChangeBatch - // UnwindTxs is the unwindTxs argument value. - UnwindTxs types2.TxSlots - // MinedTxs is the minedTxs argument value. - MinedTxs types2.TxSlots - // Tx is the tx argument value. - Tx kv.Tx - } - // Started holds details about calls to the Started method. - Started []struct { - } - // ValidateSerializedTxn holds details about calls to the ValidateSerializedTxn method. - ValidateSerializedTxn []struct { - // SerializedTxn is the serializedTxn argument value. - SerializedTxn []byte - } - } - lockAddLocalTxs sync.RWMutex - lockAddNewGoodPeer sync.RWMutex - lockAddRemoteTxs sync.RWMutex - lockFilterKnownIdHashes sync.RWMutex - lockGetKnownBlobTxn sync.RWMutex - lockGetRlp sync.RWMutex - lockIdHashKnown sync.RWMutex - lockOnNewBlock sync.RWMutex - lockStarted sync.RWMutex - lockValidateSerializedTxn sync.RWMutex -} - -// AddLocalTxs calls AddLocalTxsFunc. -func (mock *PoolMock) AddLocalTxs(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { - callInfo := struct { - Ctx context.Context - NewTxs types2.TxSlots - Tx kv.Tx - }{ - Ctx: ctx, - NewTxs: newTxs, - Tx: tx, - } - mock.lockAddLocalTxs.Lock() - mock.calls.AddLocalTxs = append(mock.calls.AddLocalTxs, callInfo) - mock.lockAddLocalTxs.Unlock() - if mock.AddLocalTxsFunc == nil { - var ( - discardReasonsOut []txpoolcfg.DiscardReason - errOut error - ) - return discardReasonsOut, errOut - } - return mock.AddLocalTxsFunc(ctx, newTxs, tx) -} - -// AddLocalTxsCalls gets all the calls that were made to AddLocalTxs. -// Check the length with: -// -// len(mockedPool.AddLocalTxsCalls()) -func (mock *PoolMock) AddLocalTxsCalls() []struct { - Ctx context.Context - NewTxs types2.TxSlots - Tx kv.Tx -} { - var calls []struct { - Ctx context.Context - NewTxs types2.TxSlots - Tx kv.Tx - } - mock.lockAddLocalTxs.RLock() - calls = mock.calls.AddLocalTxs - mock.lockAddLocalTxs.RUnlock() - return calls -} - -// AddNewGoodPeer calls AddNewGoodPeerFunc. -func (mock *PoolMock) AddNewGoodPeer(peerID types2.PeerID) { - callInfo := struct { - PeerID types2.PeerID - }{ - PeerID: peerID, - } - mock.lockAddNewGoodPeer.Lock() - mock.calls.AddNewGoodPeer = append(mock.calls.AddNewGoodPeer, callInfo) - mock.lockAddNewGoodPeer.Unlock() - if mock.AddNewGoodPeerFunc == nil { - return - } - mock.AddNewGoodPeerFunc(peerID) -} - -// AddNewGoodPeerCalls gets all the calls that were made to AddNewGoodPeer. -// Check the length with: -// -// len(mockedPool.AddNewGoodPeerCalls()) -func (mock *PoolMock) AddNewGoodPeerCalls() []struct { - PeerID types2.PeerID -} { - var calls []struct { - PeerID types2.PeerID - } - mock.lockAddNewGoodPeer.RLock() - calls = mock.calls.AddNewGoodPeer - mock.lockAddNewGoodPeer.RUnlock() - return calls -} - -// AddRemoteTxs calls AddRemoteTxsFunc. -func (mock *PoolMock) AddRemoteTxs(ctx context.Context, newTxs types2.TxSlots) { - callInfo := struct { - Ctx context.Context - NewTxs types2.TxSlots - }{ - Ctx: ctx, - NewTxs: newTxs, - } - mock.lockAddRemoteTxs.Lock() - mock.calls.AddRemoteTxs = append(mock.calls.AddRemoteTxs, callInfo) - mock.lockAddRemoteTxs.Unlock() - if mock.AddRemoteTxsFunc == nil { - return - } - mock.AddRemoteTxsFunc(ctx, newTxs) -} - -// AddRemoteTxsCalls gets all the calls that were made to AddRemoteTxs. -// Check the length with: -// -// len(mockedPool.AddRemoteTxsCalls()) -func (mock *PoolMock) AddRemoteTxsCalls() []struct { - Ctx context.Context - NewTxs types2.TxSlots -} { - var calls []struct { - Ctx context.Context - NewTxs types2.TxSlots - } - mock.lockAddRemoteTxs.RLock() - calls = mock.calls.AddRemoteTxs - mock.lockAddRemoteTxs.RUnlock() - return calls -} - -// FilterKnownIdHashes calls FilterKnownIdHashesFunc. -func (mock *PoolMock) FilterKnownIdHashes(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { - callInfo := struct { - Tx kv.Tx - Hashes types2.Hashes - }{ - Tx: tx, - Hashes: hashes, - } - mock.lockFilterKnownIdHashes.Lock() - mock.calls.FilterKnownIdHashes = append(mock.calls.FilterKnownIdHashes, callInfo) - mock.lockFilterKnownIdHashes.Unlock() - if mock.FilterKnownIdHashesFunc == nil { - var ( - unknownHashesOut types2.Hashes - errOut error - ) - return unknownHashesOut, errOut - } - return mock.FilterKnownIdHashesFunc(tx, hashes) -} - -// FilterKnownIdHashesCalls gets all the calls that were made to FilterKnownIdHashes. -// Check the length with: -// -// len(mockedPool.FilterKnownIdHashesCalls()) -func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { - Tx kv.Tx - Hashes types2.Hashes -} { - var calls []struct { - Tx kv.Tx - Hashes types2.Hashes - } - mock.lockFilterKnownIdHashes.RLock() - calls = mock.calls.FilterKnownIdHashes - mock.lockFilterKnownIdHashes.RUnlock() - return calls -} - -// GetKnownBlobTxn calls GetKnownBlobTxnFunc. -func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { - callInfo := struct { - Tx kv.Tx - Hash []byte - }{ - Tx: tx, - Hash: hash, - } - mock.lockGetKnownBlobTxn.Lock() - mock.calls.GetKnownBlobTxn = append(mock.calls.GetKnownBlobTxn, callInfo) - mock.lockGetKnownBlobTxn.Unlock() - if mock.GetKnownBlobTxnFunc == nil { - var ( - metaTxMoqParamOut *metaTx - ) - return metaTxMoqParamOut - } - return mock.GetKnownBlobTxnFunc(tx, hash) -} - -// GetKnownBlobTxnCalls gets all the calls that were made to GetKnownBlobTxn. -// Check the length with: -// -// len(mockedPool.GetKnownBlobTxnCalls()) -func (mock *PoolMock) GetKnownBlobTxnCalls() []struct { - Tx kv.Tx - Hash []byte -} { - var calls []struct { - Tx kv.Tx - Hash []byte - } - mock.lockGetKnownBlobTxn.RLock() - calls = mock.calls.GetKnownBlobTxn - mock.lockGetKnownBlobTxn.RUnlock() - return calls -} - -// GetRlp calls GetRlpFunc. -func (mock *PoolMock) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { - callInfo := struct { - Tx kv.Tx - Hash []byte - }{ - Tx: tx, - Hash: hash, - } - mock.lockGetRlp.Lock() - mock.calls.GetRlp = append(mock.calls.GetRlp, callInfo) - mock.lockGetRlp.Unlock() - if mock.GetRlpFunc == nil { - var ( - bytesOut []byte - errOut error - ) - return bytesOut, errOut - } - return mock.GetRlpFunc(tx, hash) -} - -// GetRlpCalls gets all the calls that were made to GetRlp. -// Check the length with: -// -// len(mockedPool.GetRlpCalls()) -func (mock *PoolMock) GetRlpCalls() []struct { - Tx kv.Tx - Hash []byte -} { - var calls []struct { - Tx kv.Tx - Hash []byte - } - mock.lockGetRlp.RLock() - calls = mock.calls.GetRlp - mock.lockGetRlp.RUnlock() - return calls -} - -// IdHashKnown calls IdHashKnownFunc. -func (mock *PoolMock) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { - callInfo := struct { - Tx kv.Tx - Hash []byte - }{ - Tx: tx, - Hash: hash, - } - mock.lockIdHashKnown.Lock() - mock.calls.IdHashKnown = append(mock.calls.IdHashKnown, callInfo) - mock.lockIdHashKnown.Unlock() - if mock.IdHashKnownFunc == nil { - var ( - bOut bool - errOut error - ) - return bOut, errOut - } - return mock.IdHashKnownFunc(tx, hash) -} - -// IdHashKnownCalls gets all the calls that were made to IdHashKnown. -// Check the length with: -// -// len(mockedPool.IdHashKnownCalls()) -func (mock *PoolMock) IdHashKnownCalls() []struct { - Tx kv.Tx - Hash []byte -} { - var calls []struct { - Tx kv.Tx - Hash []byte - } - mock.lockIdHashKnown.RLock() - calls = mock.calls.IdHashKnown - mock.lockIdHashKnown.RUnlock() - return calls -} - -// OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { - callInfo := struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx - }{ - Ctx: ctx, - StateChanges: stateChanges, - UnwindTxs: unwindTxs, - MinedTxs: minedTxs, - Tx: tx, - } - mock.lockOnNewBlock.Lock() - mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo) - mock.lockOnNewBlock.Unlock() - if mock.OnNewBlockFunc == nil { - var ( - errOut error - ) - return errOut - } - return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, minedTxs, tx) -} - -// OnNewBlockCalls gets all the calls that were made to OnNewBlock. -// Check the length with: -// -// len(mockedPool.OnNewBlockCalls()) -func (mock *PoolMock) OnNewBlockCalls() []struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx -} { - var calls []struct { - Ctx context.Context - StateChanges *remote.StateChangeBatch - UnwindTxs types2.TxSlots - MinedTxs types2.TxSlots - Tx kv.Tx - } - mock.lockOnNewBlock.RLock() - calls = mock.calls.OnNewBlock - mock.lockOnNewBlock.RUnlock() - return calls -} - -// Started calls StartedFunc. -func (mock *PoolMock) Started() bool { - callInfo := struct { - }{} - mock.lockStarted.Lock() - mock.calls.Started = append(mock.calls.Started, callInfo) - mock.lockStarted.Unlock() - if mock.StartedFunc == nil { - var ( - bOut bool - ) - return bOut - } - return mock.StartedFunc() -} - -// StartedCalls gets all the calls that were made to Started. -// Check the length with: -// -// len(mockedPool.StartedCalls()) -func (mock *PoolMock) StartedCalls() []struct { -} { - var calls []struct { - } - mock.lockStarted.RLock() - calls = mock.calls.Started - mock.lockStarted.RUnlock() - return calls -} - -// ValidateSerializedTxn calls ValidateSerializedTxnFunc. -func (mock *PoolMock) ValidateSerializedTxn(serializedTxn []byte) error { - callInfo := struct { - SerializedTxn []byte - }{ - SerializedTxn: serializedTxn, - } - mock.lockValidateSerializedTxn.Lock() - mock.calls.ValidateSerializedTxn = append(mock.calls.ValidateSerializedTxn, callInfo) - mock.lockValidateSerializedTxn.Unlock() - if mock.ValidateSerializedTxnFunc == nil { - var ( - errOut error - ) - return errOut - } - return mock.ValidateSerializedTxnFunc(serializedTxn) -} - -// ValidateSerializedTxnCalls gets all the calls that were made to ValidateSerializedTxn. -// Check the length with: -// -// len(mockedPool.ValidateSerializedTxnCalls()) -func (mock *PoolMock) ValidateSerializedTxnCalls() []struct { - SerializedTxn []byte -} { - var calls []struct { - SerializedTxn []byte - } - mock.lockValidateSerializedTxn.RLock() - calls = mock.calls.ValidateSerializedTxn - mock.lockValidateSerializedTxn.RUnlock() - return calls -} diff --git a/txpool/pool.go b/txpool/pool.go index 31e785454..3fd1629fe 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -89,7 +89,7 @@ type Pool interface { FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) - GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx + GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) AddNewGoodPeer(peerID types.PeerID) } @@ -561,28 +561,32 @@ func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) { return nil, false } -func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { +func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { hashS := string(hash) p.lock.Lock() defer p.lock.Unlock() if mt, ok := p.minedBlobTxsByHash[hashS]; ok { - return mt + return mt, nil } if txn, ok := p.getUnprocessedTxn(hashS); ok { - return newMetaTx(txn, false, 0) + return newMetaTx(txn, false, 0), nil } if mt, ok := p.byHash[hashS]; ok { - return mt + return mt, nil } - if has, _ := tx.Has(kv.PoolTransaction, hash); has { - txn, _ := tx.GetOne(kv.PoolTransaction, hash) - parseCtx := types.NewTxParseContext(p.chainID) - parseCtx.WithSender(false) - txSlot := &types.TxSlot{} - parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) - return newMetaTx(txSlot, false, 0) + has, err := tx.Has(kv.PoolTransaction, hash) + if err != nil { + return nil, err } - return nil + if !has { + return nil, nil + } + txn, _ := tx.GetOne(kv.PoolTransaction, hash) + parseCtx := types.NewTxParseContext(p.chainID) + parseCtx.WithSender(false) + txSlot := &types.TxSlot{} + parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) + return newMetaTx(txSlot, false, 0), nil } func (p *TxPool) IsLocal(idHash []byte) bool { From 2f2f72e1d90c5f2bbabc3338eb424a1336727ef1 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 10:04:27 +0700 Subject: [PATCH 3/5] save --- txpool/mocks_test.go | 563 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 563 insertions(+) create mode 100644 txpool/mocks_test.go diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go new file mode 100644 index 000000000..22e8e8121 --- /dev/null +++ b/txpool/mocks_test.go @@ -0,0 +1,563 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package txpool + +import ( + "context" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg" + types2 "github.com/ledgerwatch/erigon-lib/types" + "sync" +) + +// Ensure, that PoolMock does implement Pool. +// If this is not the case, regenerate this file with moq. +var _ Pool = &PoolMock{} + +// PoolMock is a mock implementation of Pool. +// +// func TestSomethingThatUsesPool(t *testing.T) { +// +// // make and configure a mocked Pool +// mockedPool := &PoolMock{ +// AddLocalTxsFunc: func(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { +// panic("mock out the AddLocalTxs method") +// }, +// AddNewGoodPeerFunc: func(peerID types2.PeerID) { +// panic("mock out the AddNewGoodPeer method") +// }, +// AddRemoteTxsFunc: func(ctx context.Context, newTxs types2.TxSlots) { +// panic("mock out the AddRemoteTxs method") +// }, +// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { +// panic("mock out the FilterKnownIdHashes method") +// }, +// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) (*metaTx, error) { +// panic("mock out the GetKnownBlobTxn method") +// }, +// GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) { +// panic("mock out the GetRlp method") +// }, +// IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) { +// panic("mock out the IdHashKnown method") +// }, +// OnNewBlockFunc: func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { +// panic("mock out the OnNewBlock method") +// }, +// StartedFunc: func() bool { +// panic("mock out the Started method") +// }, +// ValidateSerializedTxnFunc: func(serializedTxn []byte) error { +// panic("mock out the ValidateSerializedTxn method") +// }, +// } +// +// // use mockedPool in code that requires Pool +// // and then make assertions. +// +// } +type PoolMock struct { + // AddLocalTxsFunc mocks the AddLocalTxs method. + AddLocalTxsFunc func(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) + + // AddNewGoodPeerFunc mocks the AddNewGoodPeer method. + AddNewGoodPeerFunc func(peerID types2.PeerID) + + // AddRemoteTxsFunc mocks the AddRemoteTxs method. + AddRemoteTxsFunc func(ctx context.Context, newTxs types2.TxSlots) + + // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. + FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) + + // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. + GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) (*metaTx, error) + + // GetRlpFunc mocks the GetRlp method. + GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error) + + // IdHashKnownFunc mocks the IdHashKnown method. + IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error) + + // OnNewBlockFunc mocks the OnNewBlock method. + OnNewBlockFunc func(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error + + // StartedFunc mocks the Started method. + StartedFunc func() bool + + // ValidateSerializedTxnFunc mocks the ValidateSerializedTxn method. + ValidateSerializedTxnFunc func(serializedTxn []byte) error + + // calls tracks calls to the methods. + calls struct { + // AddLocalTxs holds details about calls to the AddLocalTxs method. + AddLocalTxs []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // NewTxs is the newTxs argument value. + NewTxs types2.TxSlots + // Tx is the tx argument value. + Tx kv.Tx + } + // AddNewGoodPeer holds details about calls to the AddNewGoodPeer method. + AddNewGoodPeer []struct { + // PeerID is the peerID argument value. + PeerID types2.PeerID + } + // AddRemoteTxs holds details about calls to the AddRemoteTxs method. + AddRemoteTxs []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // NewTxs is the newTxs argument value. + NewTxs types2.TxSlots + } + // FilterKnownIdHashes holds details about calls to the FilterKnownIdHashes method. + FilterKnownIdHashes []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hashes is the hashes argument value. + Hashes types2.Hashes + } + // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. + GetKnownBlobTxn []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hash is the hash argument value. + Hash []byte + } + // GetRlp holds details about calls to the GetRlp method. + GetRlp []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hash is the hash argument value. + Hash []byte + } + // IdHashKnown holds details about calls to the IdHashKnown method. + IdHashKnown []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hash is the hash argument value. + Hash []byte + } + // OnNewBlock holds details about calls to the OnNewBlock method. + OnNewBlock []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // StateChanges is the stateChanges argument value. + StateChanges *remote.StateChangeBatch + // UnwindTxs is the unwindTxs argument value. + UnwindTxs types2.TxSlots + // MinedTxs is the minedTxs argument value. + MinedTxs types2.TxSlots + // Tx is the tx argument value. + Tx kv.Tx + } + // Started holds details about calls to the Started method. + Started []struct { + } + // ValidateSerializedTxn holds details about calls to the ValidateSerializedTxn method. + ValidateSerializedTxn []struct { + // SerializedTxn is the serializedTxn argument value. + SerializedTxn []byte + } + } + lockAddLocalTxs sync.RWMutex + lockAddNewGoodPeer sync.RWMutex + lockAddRemoteTxs sync.RWMutex + lockFilterKnownIdHashes sync.RWMutex + lockGetKnownBlobTxn sync.RWMutex + lockGetRlp sync.RWMutex + lockIdHashKnown sync.RWMutex + lockOnNewBlock sync.RWMutex + lockStarted sync.RWMutex + lockValidateSerializedTxn sync.RWMutex +} + +// AddLocalTxs calls AddLocalTxsFunc. +func (mock *PoolMock) AddLocalTxs(ctx context.Context, newTxs types2.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { + callInfo := struct { + Ctx context.Context + NewTxs types2.TxSlots + Tx kv.Tx + }{ + Ctx: ctx, + NewTxs: newTxs, + Tx: tx, + } + mock.lockAddLocalTxs.Lock() + mock.calls.AddLocalTxs = append(mock.calls.AddLocalTxs, callInfo) + mock.lockAddLocalTxs.Unlock() + if mock.AddLocalTxsFunc == nil { + var ( + discardReasonsOut []txpoolcfg.DiscardReason + errOut error + ) + return discardReasonsOut, errOut + } + return mock.AddLocalTxsFunc(ctx, newTxs, tx) +} + +// AddLocalTxsCalls gets all the calls that were made to AddLocalTxs. +// Check the length with: +// +// len(mockedPool.AddLocalTxsCalls()) +func (mock *PoolMock) AddLocalTxsCalls() []struct { + Ctx context.Context + NewTxs types2.TxSlots + Tx kv.Tx +} { + var calls []struct { + Ctx context.Context + NewTxs types2.TxSlots + Tx kv.Tx + } + mock.lockAddLocalTxs.RLock() + calls = mock.calls.AddLocalTxs + mock.lockAddLocalTxs.RUnlock() + return calls +} + +// AddNewGoodPeer calls AddNewGoodPeerFunc. +func (mock *PoolMock) AddNewGoodPeer(peerID types2.PeerID) { + callInfo := struct { + PeerID types2.PeerID + }{ + PeerID: peerID, + } + mock.lockAddNewGoodPeer.Lock() + mock.calls.AddNewGoodPeer = append(mock.calls.AddNewGoodPeer, callInfo) + mock.lockAddNewGoodPeer.Unlock() + if mock.AddNewGoodPeerFunc == nil { + return + } + mock.AddNewGoodPeerFunc(peerID) +} + +// AddNewGoodPeerCalls gets all the calls that were made to AddNewGoodPeer. +// Check the length with: +// +// len(mockedPool.AddNewGoodPeerCalls()) +func (mock *PoolMock) AddNewGoodPeerCalls() []struct { + PeerID types2.PeerID +} { + var calls []struct { + PeerID types2.PeerID + } + mock.lockAddNewGoodPeer.RLock() + calls = mock.calls.AddNewGoodPeer + mock.lockAddNewGoodPeer.RUnlock() + return calls +} + +// AddRemoteTxs calls AddRemoteTxsFunc. +func (mock *PoolMock) AddRemoteTxs(ctx context.Context, newTxs types2.TxSlots) { + callInfo := struct { + Ctx context.Context + NewTxs types2.TxSlots + }{ + Ctx: ctx, + NewTxs: newTxs, + } + mock.lockAddRemoteTxs.Lock() + mock.calls.AddRemoteTxs = append(mock.calls.AddRemoteTxs, callInfo) + mock.lockAddRemoteTxs.Unlock() + if mock.AddRemoteTxsFunc == nil { + return + } + mock.AddRemoteTxsFunc(ctx, newTxs) +} + +// AddRemoteTxsCalls gets all the calls that were made to AddRemoteTxs. +// Check the length with: +// +// len(mockedPool.AddRemoteTxsCalls()) +func (mock *PoolMock) AddRemoteTxsCalls() []struct { + Ctx context.Context + NewTxs types2.TxSlots +} { + var calls []struct { + Ctx context.Context + NewTxs types2.TxSlots + } + mock.lockAddRemoteTxs.RLock() + calls = mock.calls.AddRemoteTxs + mock.lockAddRemoteTxs.RUnlock() + return calls +} + +// FilterKnownIdHashes calls FilterKnownIdHashesFunc. +func (mock *PoolMock) FilterKnownIdHashes(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { + callInfo := struct { + Tx kv.Tx + Hashes types2.Hashes + }{ + Tx: tx, + Hashes: hashes, + } + mock.lockFilterKnownIdHashes.Lock() + mock.calls.FilterKnownIdHashes = append(mock.calls.FilterKnownIdHashes, callInfo) + mock.lockFilterKnownIdHashes.Unlock() + if mock.FilterKnownIdHashesFunc == nil { + var ( + unknownHashesOut types2.Hashes + errOut error + ) + return unknownHashesOut, errOut + } + return mock.FilterKnownIdHashesFunc(tx, hashes) +} + +// FilterKnownIdHashesCalls gets all the calls that were made to FilterKnownIdHashes. +// Check the length with: +// +// len(mockedPool.FilterKnownIdHashesCalls()) +func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { + Tx kv.Tx + Hashes types2.Hashes +} { + var calls []struct { + Tx kv.Tx + Hashes types2.Hashes + } + mock.lockFilterKnownIdHashes.RLock() + calls = mock.calls.FilterKnownIdHashes + mock.lockFilterKnownIdHashes.RUnlock() + return calls +} + +// GetKnownBlobTxn calls GetKnownBlobTxnFunc. +func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { + callInfo := struct { + Tx kv.Tx + Hash []byte + }{ + Tx: tx, + Hash: hash, + } + mock.lockGetKnownBlobTxn.Lock() + mock.calls.GetKnownBlobTxn = append(mock.calls.GetKnownBlobTxn, callInfo) + mock.lockGetKnownBlobTxn.Unlock() + if mock.GetKnownBlobTxnFunc == nil { + var ( + metaTxMoqParamOut *metaTx + errOut error + ) + return metaTxMoqParamOut, errOut + } + return mock.GetKnownBlobTxnFunc(tx, hash) +} + +// GetKnownBlobTxnCalls gets all the calls that were made to GetKnownBlobTxn. +// Check the length with: +// +// len(mockedPool.GetKnownBlobTxnCalls()) +func (mock *PoolMock) GetKnownBlobTxnCalls() []struct { + Tx kv.Tx + Hash []byte +} { + var calls []struct { + Tx kv.Tx + Hash []byte + } + mock.lockGetKnownBlobTxn.RLock() + calls = mock.calls.GetKnownBlobTxn + mock.lockGetKnownBlobTxn.RUnlock() + return calls +} + +// GetRlp calls GetRlpFunc. +func (mock *PoolMock) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { + callInfo := struct { + Tx kv.Tx + Hash []byte + }{ + Tx: tx, + Hash: hash, + } + mock.lockGetRlp.Lock() + mock.calls.GetRlp = append(mock.calls.GetRlp, callInfo) + mock.lockGetRlp.Unlock() + if mock.GetRlpFunc == nil { + var ( + bytesOut []byte + errOut error + ) + return bytesOut, errOut + } + return mock.GetRlpFunc(tx, hash) +} + +// GetRlpCalls gets all the calls that were made to GetRlp. +// Check the length with: +// +// len(mockedPool.GetRlpCalls()) +func (mock *PoolMock) GetRlpCalls() []struct { + Tx kv.Tx + Hash []byte +} { + var calls []struct { + Tx kv.Tx + Hash []byte + } + mock.lockGetRlp.RLock() + calls = mock.calls.GetRlp + mock.lockGetRlp.RUnlock() + return calls +} + +// IdHashKnown calls IdHashKnownFunc. +func (mock *PoolMock) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { + callInfo := struct { + Tx kv.Tx + Hash []byte + }{ + Tx: tx, + Hash: hash, + } + mock.lockIdHashKnown.Lock() + mock.calls.IdHashKnown = append(mock.calls.IdHashKnown, callInfo) + mock.lockIdHashKnown.Unlock() + if mock.IdHashKnownFunc == nil { + var ( + bOut bool + errOut error + ) + return bOut, errOut + } + return mock.IdHashKnownFunc(tx, hash) +} + +// IdHashKnownCalls gets all the calls that were made to IdHashKnown. +// Check the length with: +// +// len(mockedPool.IdHashKnownCalls()) +func (mock *PoolMock) IdHashKnownCalls() []struct { + Tx kv.Tx + Hash []byte +} { + var calls []struct { + Tx kv.Tx + Hash []byte + } + mock.lockIdHashKnown.RLock() + calls = mock.calls.IdHashKnown + mock.lockIdHashKnown.RUnlock() + return calls +} + +// OnNewBlock calls OnNewBlockFunc. +func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs types2.TxSlots, minedTxs types2.TxSlots, tx kv.Tx) error { + callInfo := struct { + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx + }{ + Ctx: ctx, + StateChanges: stateChanges, + UnwindTxs: unwindTxs, + MinedTxs: minedTxs, + Tx: tx, + } + mock.lockOnNewBlock.Lock() + mock.calls.OnNewBlock = append(mock.calls.OnNewBlock, callInfo) + mock.lockOnNewBlock.Unlock() + if mock.OnNewBlockFunc == nil { + var ( + errOut error + ) + return errOut + } + return mock.OnNewBlockFunc(ctx, stateChanges, unwindTxs, minedTxs, tx) +} + +// OnNewBlockCalls gets all the calls that were made to OnNewBlock. +// Check the length with: +// +// len(mockedPool.OnNewBlockCalls()) +func (mock *PoolMock) OnNewBlockCalls() []struct { + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx +} { + var calls []struct { + Ctx context.Context + StateChanges *remote.StateChangeBatch + UnwindTxs types2.TxSlots + MinedTxs types2.TxSlots + Tx kv.Tx + } + mock.lockOnNewBlock.RLock() + calls = mock.calls.OnNewBlock + mock.lockOnNewBlock.RUnlock() + return calls +} + +// Started calls StartedFunc. +func (mock *PoolMock) Started() bool { + callInfo := struct { + }{} + mock.lockStarted.Lock() + mock.calls.Started = append(mock.calls.Started, callInfo) + mock.lockStarted.Unlock() + if mock.StartedFunc == nil { + var ( + bOut bool + ) + return bOut + } + return mock.StartedFunc() +} + +// StartedCalls gets all the calls that were made to Started. +// Check the length with: +// +// len(mockedPool.StartedCalls()) +func (mock *PoolMock) StartedCalls() []struct { +} { + var calls []struct { + } + mock.lockStarted.RLock() + calls = mock.calls.Started + mock.lockStarted.RUnlock() + return calls +} + +// ValidateSerializedTxn calls ValidateSerializedTxnFunc. +func (mock *PoolMock) ValidateSerializedTxn(serializedTxn []byte) error { + callInfo := struct { + SerializedTxn []byte + }{ + SerializedTxn: serializedTxn, + } + mock.lockValidateSerializedTxn.Lock() + mock.calls.ValidateSerializedTxn = append(mock.calls.ValidateSerializedTxn, callInfo) + mock.lockValidateSerializedTxn.Unlock() + if mock.ValidateSerializedTxnFunc == nil { + var ( + errOut error + ) + return errOut + } + return mock.ValidateSerializedTxnFunc(serializedTxn) +} + +// ValidateSerializedTxnCalls gets all the calls that were made to ValidateSerializedTxn. +// Check the length with: +// +// len(mockedPool.ValidateSerializedTxnCalls()) +func (mock *PoolMock) ValidateSerializedTxnCalls() []struct { + SerializedTxn []byte +} { + var calls []struct { + SerializedTxn []byte + } + mock.lockValidateSerializedTxn.RLock() + calls = mock.calls.ValidateSerializedTxn + mock.lockValidateSerializedTxn.RUnlock() + return calls +} From 60db7053bbaff9e22a3bec549d55de8e28e41886 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 10:08:56 +0700 Subject: [PATCH 4/5] save --- txpool/pool.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 3fd1629fe..520650ff6 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -590,9 +590,10 @@ func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { } func (p *TxPool) IsLocal(idHash []byte) bool { + hashS := string(idHash) p.lock.Lock() defer p.lock.Unlock() - return p.isLocalLRU.Contains(string(idHash)) + return p.isLocalLRU.Contains(hashS) } func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } @@ -701,11 +702,12 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) { p.lock.Lock() defer p.lock.Unlock() for i, txn := range newTxs.Txs { - _, ok := p.unprocessedRemoteByHash[string(txn.IDHash[:])] + hashS := string(txn.IDHash[:]) + _, ok := p.unprocessedRemoteByHash[hashS] if ok { continue } - p.unprocessedRemoteByHash[string(txn.IDHash[:])] = len(p.unprocessedRemoteTxs.Txs) + p.unprocessedRemoteByHash[hashS] = len(p.unprocessedRemoteTxs.Txs) p.unprocessedRemoteTxs.Append(txn, newTxs.Senders.At(i), false) } } @@ -1275,11 +1277,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // Remove from mined cache in case this is coming from unwind txs // and to ensure not double adding into the memory - if _, ok := p.minedBlobTxsByHash[string(mt.Tx.IDHash[:])]; ok { - p.deleteMinedBlobTxn(string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + if _, ok := p.minedBlobTxsByHash[hashStr]; ok { + p.deleteMinedBlobTxn(hashStr) } - p.byHash[string(mt.Tx.IDHash[:])] = mt + p.byHash[hashStr] = mt if replaced := p.all.replaceOrInsert(mt); replaced != nil { if assert.Enable { @@ -1288,7 +1291,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo } if mt.subPool&IsLocal != 0 { - p.isLocalLRU.Add(string(mt.Tx.IDHash[:]), struct{}{}) + p.isLocalLRU.Add(hashStr, struct{}{}) } // All transactions are first added to the queued pool and then immediately promoted from there if required p.queued.Add(mt, p.logger) @@ -1298,10 +1301,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // dropping transaction from all sub-structures and from db // Important: don't call it while iterating by all func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) { - delete(p.byHash, string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + delete(p.byHash, hashStr) p.deletedTxs = append(p.deletedTxs, mt) p.all.delete(mt) - p.discardReasonsLRU.Add(string(mt.Tx.IDHash[:]), reason) + p.discardReasonsLRU.Add(hashStr, reason) } // Cache recently mined blobs in anticipation of reorg, delete finalized ones From e77827a4306662f07a3dbf92e8d98daf419b2fd4 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 10:20:38 +0700 Subject: [PATCH 5/5] save --- txpool/pool.go | 55 +++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 520650ff6..1c39b28bb 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -293,40 +293,40 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, } func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + if err := minedTxs.Valid(); err != nil { + return err + } + defer newBlockTimer.UpdateDuration(time.Now()) //t := time.Now() - cache := p.cache() + coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) - coreTx, err := p.coreDB().BeginRo(ctx) + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - p.lock.Lock() - defer p.lock.Unlock() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) if !p.started.Load() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { + if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) } } - cacheView, err := cache.View(ctx, coreTx) if err != nil { return err } + + p.lock.Lock() + defer p.lock.Unlock() + if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - - if err := minedTxs.Valid(); err != nil { - return err - } baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) @@ -406,9 +406,9 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return fmt.Errorf("txpool not started yet") } - cache := p.cache() defer processBatchTxsTimer.UpdateDuration(time.Now()) - coreTx, err := p.coreDB().BeginRo(ctx) + coreDB, cache := p.coreDBWithCache() + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } @@ -599,9 +599,6 @@ func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers. func (p *TxPool) Started() bool { return p.started.Load() } func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() - // First wait for the corresponding block to arrive if p.lastSeenBlock.Load() < onTopOf { return false, 0, nil // Too early @@ -683,11 +680,15 @@ func (p *TxPool) ResetYieldedStatus() { } func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() + p.lock.Lock() + defer p.lock.Unlock() onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -1004,13 +1005,14 @@ func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots, } func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { - coreTx, err := p.coreDB().BeginRo(ctx) + coreDb, cache := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) if err != nil { return nil, err } defer coreTx.Rollback() - cacheView, err := p.cache().View(ctx, coreTx) + cacheView, err := cache.View(ctx, coreTx) if err != nil { return nil, err } @@ -1068,19 +1070,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } return reasons, nil } - -func (p *TxPool) coreDB() kv.RoDB { +func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { p.lock.Lock() defer p.lock.Unlock() - return p._chainDB + return p._chainDB, p._stateCache } - -func (p *TxPool) cache() kvcache.Cache { - p.lock.Lock() - defer p.lock.Unlock() - return p._stateCache -} - func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, @@ -1863,6 +1857,11 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } +func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { + p.lock.Lock() + defer p.lock.Unlock() + return p.fromDB(ctx, tx, coreTx) +} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx)