diff --git a/CHANGELOG.md b/CHANGELOG.md index ba5b6877a..96dae1703 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#2020](https://github.com/NibiruChain/nibiru/pull/2020) - test(evm): e2e tests for debug namespace - [#2022](https://github.com/NibiruChain/nibiru/pull/2022) - feat(evm): debug_traceCall method implemented - [#2023](https://github.com/NibiruChain/nibiru/pull/2023) - fix(evm)!: adjusted generation and parsing of the block bloom events +- [#2030](https://github.com/NibiruChain/nibiru/pull/2030) - refactor(eth/rpc): Delete unused code and improve logging in the eth and debug namespaces - [#2031](https://github.com/NibiruChain/nibiru/pull/2031) - fix(evm): debug calls with custom tracer and tracer options #### Dapp modules: perp, spot, oracle, etc diff --git a/app/evmante/evmante_emit_event.go b/app/evmante/evmante_emit_event.go index 95b85b648..01e178d6f 100644 --- a/app/evmante/evmante_emit_event.go +++ b/app/evmante/evmante_emit_event.go @@ -54,6 +54,9 @@ func (eeed EthEmitEventDecorator) AnteHandle( 10, ), ), // #nosec G701 + // TODO: fix: It's odd that each event is emitted twice. Migrate to typed + // events and change EVM indexer to align. + // sdk.NewAttribute("emitted_from", "EthEmitEventDecorator"), )) } diff --git a/e2e/evm/test/basic_queries.test.ts b/e2e/evm/test/basic_queries.test.ts deleted file mode 100644 index 8caed4032..000000000 --- a/e2e/evm/test/basic_queries.test.ts +++ /dev/null @@ -1,397 +0,0 @@ -import { describe, expect, it, jest } from "@jest/globals" -import { - toBigInt, - parseEther, - keccak256, - AbiCoder, - TransactionRequest, - Block, - TransactionResponse, -} from "ethers" -import { account, provider } from "./setup" -import { - INTRINSIC_TX_GAS, - alice, - deployContractTestERC20, - deployContractSendNibi, - hexify, - sendTestNibi, -} from "./utils" - -describe("Basic Queries", () => { - jest.setTimeout(15e3) - - it("Simple transfer, balance check", async () => { - const amountToSend = toBigInt(5e12) * toBigInt(1e6) // unibi - const senderBalanceBefore = await provider.getBalance(account) - const recipientBalanceBefore = await provider.getBalance(alice) - expect(senderBalanceBefore).toBeGreaterThan(0) - expect(recipientBalanceBefore).toEqual(BigInt(0)) - - const tenPow12 = toBigInt(1e12) - - // Execute EVM transfer - const transaction: TransactionRequest = { - gasLimit: toBigInt(100e3), - to: alice, - value: amountToSend, - } - const txResponse = await account.sendTransaction(transaction) - await txResponse.wait(1, 10e3) - expect(txResponse).toHaveProperty("blockHash") - - const senderBalanceAfter = await provider.getBalance(account) - const recipientBalanceAfter = await provider.getBalance(alice) - - // Assert balances with logging - const gasUsed = 50000n // 50k gas for the transaction - const txCostMicronibi = amountToSend / tenPow12 + gasUsed - const txCostWei = txCostMicronibi * tenPow12 - const expectedSenderWei = senderBalanceBefore - txCostWei - console.debug("DEBUG should send via transfer method %o:", { - senderBalanceBefore, - amountToSend, - expectedSenderWei, - senderBalanceAfter, - }) - expect(senderBalanceAfter).toEqual(expectedSenderWei) - expect(recipientBalanceAfter).toEqual(amountToSend) - }) - - it("eth_accounts", async () => { - const accounts = await provider.listAccounts() - expect(accounts).not.toHaveLength(0) - }) - - it("eth_estimateGas", async () => { - const tx = { - from: account.address, - to: alice, - value: parseEther("0.01"), // Sending 0.01 Ether - } - const estimatedGas = await provider.estimateGas(tx) - expect(estimatedGas).toBeGreaterThan(BigInt(0)) - expect(estimatedGas).toEqual(INTRINSIC_TX_GAS) - }) - - it("eth_feeHistory", async () => { - const blockCount = 5 // Number of blocks in the requested history - const newestBlock = "latest" // Can be a block number or 'latest' - const rewardPercentiles = [25, 50, 75] // Example percentiles for priority fees - - const feeHistory = await provider.send("eth_feeHistory", [ - blockCount, - newestBlock, - rewardPercentiles, - ]) - expect(feeHistory).toBeDefined() - expect(feeHistory).toHaveProperty("baseFeePerGas") - expect(feeHistory).toHaveProperty("gasUsedRatio") - expect(feeHistory).toHaveProperty("oldestBlock") - expect(feeHistory).toHaveProperty("reward") - }) - - it("eth_gasPrice", async () => { - const gasPrice = await provider.send("eth_gasPrice", []) - expect(gasPrice).toBeDefined() - expect(gasPrice).toEqual(hexify(1)) - }) - - it("eth_getBalance", async () => { - const balance = await provider.getBalance(account.address) - expect(balance).toBeGreaterThan(0) - }) - - it("eth_getBlockByNumber, eth_getBlockByHash", async () => { - const blockNumber = 1 - const blockByNumber = await provider.send("eth_getBlockByNumber", [ - blockNumber, - false, - ]) - expect(blockByNumber).toBeDefined() - expect(blockByNumber).toHaveProperty("hash") - - const blockByHash = await provider.send("eth_getBlockByHash", [ - blockByNumber.hash, - false, - ]) - expect(blockByHash).toBeDefined() - expect(blockByHash.hash).toEqual(blockByNumber.hash) - expect(blockByHash.number).toEqual(blockByNumber.number) - }) - - it("eth_getBlockTransactionCountByHash", async () => { - const blockNumber = 1 - const block = await provider.send("eth_getBlockByNumber", [ - blockNumber, - false, - ]) - const txCount = await provider.send("eth_getBlockTransactionCountByHash", [ - block.hash, - ]) - expect(parseInt(txCount)).toBeGreaterThanOrEqual(0) - }) - - it("eth_getBlockTransactionCountByNumber", async () => { - const blockNumber = 1 - const txCount = await provider.send( - "eth_getBlockTransactionCountByNumber", - [blockNumber], - ) - expect(parseInt(txCount)).toBeGreaterThanOrEqual(0) - }) - - it("eth_getCode", async () => { - const contract = await deployContractSendNibi() - const contractAddr = await contract.getAddress() - const code = await provider.send("eth_getCode", [contractAddr, "latest"]) - expect(code).toBeDefined() - }) - - it("eth_getFilterChanges", async () => { - // Deploy ERC-20 contract - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - const filter = { - fromBlock: "latest", - address: contractAddr, - } - // Create the filter for a contract - const filterId = await provider.send("eth_newFilter", [filter]) - expect(filterId).toBeDefined() - - // Execute some contract TX - const tx = await contract.transfer(alice, parseEther("0.01")) - await tx.wait(1, 5e3) - await new Promise((resolve) => setTimeout(resolve, 3000)) - - // Assert logs - const changes = await provider.send("eth_getFilterChanges", [filterId]) - expect(changes.length).toBeGreaterThan(0) - expect(changes[0]).toHaveProperty("address") - expect(changes[0]).toHaveProperty("data") - expect(changes[0]).toHaveProperty("topics") - - const success = await provider.send("eth_uninstallFilter", [filterId]) - expect(success).toBeTruthy() - }) - - // Skipping as the method is not implemented - it.skip("eth_getFilterLogs", async () => { - // Deploy ERC-20 contract - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - const filter = { - fromBlock: "latest", - address: contractAddr, - } - // Execute some contract TX - const tx = await contract.transfer(alice, parseEther("0.01")) - await tx.wait(1, 5e3) - - // Create the filter for a contract - const filterId = await provider.send("eth_newFilter", [filter]) - expect(filterId).toBeDefined() - - // Assert logs - const changes = await provider.send("eth_getFilterLogs", [filterId]) - expect(changes.length).toBeGreaterThan(0) - expect(changes[0]).toHaveProperty("address") - expect(changes[0]).toHaveProperty("data") - expect(changes[0]).toHaveProperty("topics") - }) - - // Skipping as the method is not implemented - it.skip("eth_getLogs", async () => { - // Deploy ERC-20 contract - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - const filter = { - fromBlock: "latest", - address: contractAddr, - } - // Execute some contract TX - const tx = await contract.transfer(alice, parseEther("0.01")) - - // Assert logs - const changes = await provider.send("eth_getLogs", [filter]) - expect(changes.length).toBeGreaterThan(0) - expect(changes[0]).toHaveProperty("address") - expect(changes[0]).toHaveProperty("data") - expect(changes[0]).toHaveProperty("topics") - }) - - it("eth_getProof", async () => { - // Deploy ERC-20 contract - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - - const slot = 1 // Assuming balanceOf is at slot 1 - const storageKey = keccak256( - AbiCoder.defaultAbiCoder().encode( - ["address", "uint256"], - [account.address, slot], - ), - ) - const proof = await provider.send("eth_getProof", [ - contractAddr, - [storageKey], - "latest", - ]) - // Assert proof structure - expect(proof).toHaveProperty("address") - expect(proof).toHaveProperty("balance") - expect(proof).toHaveProperty("codeHash") - expect(proof).toHaveProperty("nonce") - expect(proof).toHaveProperty("storageProof") - - if (proof.storageProof.length > 0) { - expect(proof.storageProof[0]).toHaveProperty("key", storageKey) - expect(proof.storageProof[0]).toHaveProperty("value") - expect(proof.storageProof[0]).toHaveProperty("proof") - } - }) - - // Skipping as the method is not implemented - it.skip("eth_getLogs", async () => { - // Deploy ERC-20 contract - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - const filter = { - fromBlock: "latest", - address: contractAddr, - } - // Execute some contract TX - const tx = await contract.transfer(alice, parseEther("0.01")) - await tx.wait(1, 5e3) - - // Assert logs - const logs = await provider.send("eth_getLogs", [filter]) - expect(logs.length).toBeGreaterThan(0) - expect(logs[0]).toHaveProperty("address") - expect(logs[0]).toHaveProperty("data") - expect(logs[0]).toHaveProperty("topics") - }) - - it("eth_getProof", async () => { - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - - const slot = 1 // Assuming balanceOf is at slot 1 - const storageKey = keccak256( - AbiCoder.defaultAbiCoder().encode( - ["address", "uint256"], - [account.address, slot], - ), - ) - const proof = await provider.send("eth_getProof", [ - contractAddr, - [storageKey], - "latest", - ]) - // Assert proof structure - expect(proof).toHaveProperty("address") - expect(proof).toHaveProperty("balance") - expect(proof).toHaveProperty("codeHash") - expect(proof).toHaveProperty("nonce") - expect(proof).toHaveProperty("storageProof") - - if (proof.storageProof.length > 0) { - expect(proof.storageProof[0]).toHaveProperty("key", storageKey) - expect(proof.storageProof[0]).toHaveProperty("value") - expect(proof.storageProof[0]).toHaveProperty("proof") - } - }) - - it("eth_getStorageAt", async () => { - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - - const value = await provider.getStorage(contractAddr, 1) - expect(value).toBeDefined() - }) - - it("eth_getTransactionByBlockHashAndIndex, eth_getTransactionByBlockNumberAndIndex", async () => { - // Execute EVM transfer - const txResponse: TransactionResponse = await sendTestNibi() - const block: Block = (await txResponse.getBlock()) as Block - expect(block).toBeTruthy() - - const txByBlockHash = await provider.send( - "eth_getTransactionByBlockHashAndIndex", - [block.hash, "0x0"], - ) - expect(txByBlockHash).toBeDefined() - expect(txByBlockHash).toHaveProperty("from") - expect(txByBlockHash).toHaveProperty("to") - expect(txByBlockHash).toHaveProperty("blockHash") - expect(txByBlockHash).toHaveProperty("blockNumber") - expect(txByBlockHash).toHaveProperty("value") - - const txByBlockNumber = await provider.send( - "eth_getTransactionByBlockNumberAndIndex", - [block.number, "0x0"], - ) - - expect(txByBlockNumber).toBeDefined() - expect(txByBlockNumber["from"]).toEqual(txByBlockHash["from"]) - expect(txByBlockNumber["to"]).toEqual(txByBlockHash["to"]) - expect(txByBlockNumber["value"]).toEqual(txByBlockHash["value"]) - }) - - it("eth_getTransactionByHash", async () => { - const txResponse = await sendTestNibi() - const txByHash = await provider.getTransaction(txResponse.hash) - expect(txByHash).toBeDefined() - expect(txByHash.hash).toEqual(txResponse.hash) - }) - - it("eth_getTransactionCount", async () => { - const txCount = await provider.getTransactionCount(account.address) - expect(txCount).toBeGreaterThanOrEqual(0) - }) - - it("eth_getTransactionReceipt", async () => { - const txResponse = await sendTestNibi() - const txReceipt = await provider.getTransactionReceipt(txResponse.hash) - expect(txReceipt).toBeDefined() - expect(txReceipt.hash).toEqual(txResponse.hash) - }) - - it("eth_getUncleCountByBlockHash", async () => { - const latestBlock = await provider.getBlockNumber() - const block = await provider.getBlock(latestBlock) - const uncleCount = await provider.send("eth_getUncleCountByBlockHash", [ - block.hash, - ]) - expect(parseInt(uncleCount)).toBeGreaterThanOrEqual(0) - }) - - it("eth_getUncleCountByBlockNumber", async () => { - const latestBlock = await provider.getBlockNumber() - const uncleCount = await provider.send("eth_getUncleCountByBlockNumber", [ - latestBlock, - ]) - expect(parseInt(uncleCount)).toBeGreaterThanOrEqual(0) - }) - - it("eth_maxPriorityFeePerGas", async () => { - const maxPriorityGas = await provider.send("eth_maxPriorityFeePerGas", []) - expect(parseInt(maxPriorityGas)).toBeGreaterThanOrEqual(0) - }) - - it("eth_newBlockFilter", async () => { - const filterId = await provider.send("eth_newBlockFilter", []) - expect(filterId).toBeDefined() - }) - - it("eth_newPendingTransactionFilter", async () => { - const filterId = await provider.send("eth_newPendingTransactionFilter", []) - expect(filterId).toBeDefined() - }) - - it("eth_syncing", async () => { - const syncing = await provider.send("eth_syncing", []) - expect(syncing).toBeFalsy() - }) -}) diff --git a/e2e/evm/test/debug_queries.test.ts b/e2e/evm/test/debug_queries.test.ts index c01723d90..180ef2ab6 100644 --- a/e2e/evm/test/debug_queries.test.ts +++ b/e2e/evm/test/debug_queries.test.ts @@ -41,7 +41,6 @@ describe("debug queries", () => { expectTrace(traceResult) }) - // TODO: impl in EVM: remove skip it("debug_traceBlockByHash", async () => { const traceResult = await provider.send("debug_traceBlockByHash", [ blockHash, @@ -86,22 +85,34 @@ describe("debug queries", () => { expectTrace([{ result: traceResult }]) }) - // TODO: impl in EVM: remove skip - it.skip("debug_getBadBlocks", async () => { - const traceResult = await provider.send("debug_getBadBlocks", [txHash]) - expect(traceResult).toBeDefined() + // TODO: impl in EVM + it("debug_getBadBlocks", async () => { + try { + const traceResult = await provider.send("debug_getBadBlocks", [txHash]) + expect(traceResult).toBeDefined() + } catch (err) { + expect(err.message).toContain( + "the method debug_getBadBlocks does not exist", + ) + } }) - // TODO: impl in EVM: remove skip - it.skip("debug_storageRangeAt", async () => { - const traceResult = await provider.send("debug_storageRangeAt", [ - blockNumber, - txIndex, - contractAddress, - "0x0", - 100, - ]) - expect(traceResult).toBeDefined() + // TODO: impl in EVM + it("debug_storageRangeAt", async () => { + try { + const traceResult = await provider.send("debug_storageRangeAt", [ + blockNumber, + txIndex, + contractAddress, + "0x0", + 100, + ]) + expect(traceResult).toBeDefined() + } catch (err) { + expect(err.message).toContain( + "the method debug_storageRangeAt does not exist", + ) + } }) }) diff --git a/e2e/evm/test/eth_queries.test.ts b/e2e/evm/test/eth_queries.test.ts index ac648e274..3189ede3c 100644 --- a/e2e/evm/test/eth_queries.test.ts +++ b/e2e/evm/test/eth_queries.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, jest } from "@jest/globals" -import { parseEther, keccak256, AbiCoder, TransactionRequest } from "ethers" +import { parseEther, keccak256, AbiCoder } from "ethers" import { account, provider } from "./setup" import { INTRINSIC_TX_GAS, @@ -131,7 +131,6 @@ describe("eth queries", () => { expect(success).toBeTruthy() }) - // Skipping as the method is not implemented it("eth_getFilterLogs", async () => { // Deploy ERC-20 contract const contract = await deployContractTestERC20() @@ -157,12 +156,10 @@ describe("eth queries", () => { expect(changes[0]).toHaveProperty("topics") }) - // Skipping as the method is not implemented it("eth_getLogs", async () => { // Deploy ERC-20 contract const contract = await deployContractTestERC20() const contractAddr = await contract.getAddress() - console.log(contractAddr) const filter = { fromBlock: "0x1", address: contractAddr, @@ -211,36 +208,6 @@ describe("eth queries", () => { } }) - it("eth_getProof", async () => { - const contract = await deployContractTestERC20() - const contractAddr = await contract.getAddress() - - const slot = 1 // Assuming balanceOf is at slot 1 - const storageKey = keccak256( - AbiCoder.defaultAbiCoder().encode( - ["address", "uint256"], - [account.address, slot], - ), - ) - const proof = await provider.send("eth_getProof", [ - contractAddr, - [storageKey], - "latest", - ]) - // Assert proof structure - expect(proof).toHaveProperty("address") - expect(proof).toHaveProperty("balance") - expect(proof).toHaveProperty("codeHash") - expect(proof).toHaveProperty("nonce") - expect(proof).toHaveProperty("storageProof") - - if (proof.storageProof.length > 0) { - expect(proof.storageProof[0]).toHaveProperty("key", storageKey) - expect(proof.storageProof[0]).toHaveProperty("value") - expect(proof.storageProof[0]).toHaveProperty("proof") - } - }) - it("eth_getStorageAt", async () => { const contract = await deployContractTestERC20() const contractAddr = await contract.getAddress() diff --git a/eth/indexer/kv_indexer.go b/eth/indexer/kv_indexer.go index 7f3384a4a..30696d472 100644 --- a/eth/indexer/kv_indexer.go +++ b/eth/indexer/kv_indexer.go @@ -57,7 +57,13 @@ func (kv *KVIndexer) IndexBlock(block *tmtypes.Block, txResults []*abci.Response var ethTxIndex int32 for txIndex, tx := range block.Txs { result := txResults[txIndex] - if !rpc.TxSuccessOrExpectedFailure(result) { + isValidEnough, reason := rpc.TxIsValidEnough(result) + if !isValidEnough { + kv.logger.Debug( + "Skipped indexing of tx", + "reason", reason, + "tm_tx_hash", eth.TmTxHashToString(tx.Hash()), + ) continue } diff --git a/eth/rpc/backend/blocks.go b/eth/rpc/backend/blocks.go index a56201c4e..ea3e38f95 100644 --- a/eth/rpc/backend/blocks.go +++ b/eth/rpc/backend/blocks.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "github.com/NibiruChain/nibiru/v2/eth" "github.com/NibiruChain/nibiru/v2/eth/rpc" "github.com/NibiruChain/nibiru/v2/x/evm" ) @@ -267,8 +268,13 @@ func (b *Backend) EthMsgsFromTendermintBlock( // - Include unsuccessful tx that exceeds block gas limit // - Include unsuccessful tx that failed when committing changes to stateDB // - Exclude unsuccessful tx with any other error but ExceedBlockGasLimit - if !rpc.TxSuccessOrExpectedFailure(txResults[i]) { - b.logger.Debug("invalid tx result code", "cosmos-hash", hexutil.Encode(tx.Hash())) + isValidEnough, reason := rpc.TxIsValidEnough(txResults[i]) + if !isValidEnough { + b.logger.Debug( + "invalid tx result code", + "tm_tx_hash", eth.TmTxHashToString(tx.Hash()), + "reason", reason, + ) continue } diff --git a/eth/rpc/backend/call_tx.go b/eth/rpc/backend/call_tx.go index d7cff6eb4..12f7ce378 100644 --- a/eth/rpc/backend/call_tx.go +++ b/eth/rpc/backend/call_tx.go @@ -148,7 +148,9 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) { } txHash := ethereumTx.AsTransaction().Hash() - b.logger.Debug("eth_sendRawTransaction", "txHash", txHash.Hex()) + b.logger.Debug("eth_sendRawTransaction", + "txHash", txHash.Hex(), + ) syncCtx := b.clientCtx.WithBroadcastMode(flags.BroadcastSync) rsp, err := syncCtx.BroadcastTx(txBytes) @@ -159,6 +161,9 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) { b.logger.Error("failed to broadcast tx", "error", err.Error()) return txHash, err } + b.logger.Debug("eth_sendRawTransaction", + "blockHeight", fmt.Sprintf("%d", rsp.Height), + ) return txHash, nil } diff --git a/eth/rpc/backend/filters.go b/eth/rpc/backend/filters.go index 244cda0e5..02fa6c11f 100644 --- a/eth/rpc/backend/filters.go +++ b/eth/rpc/backend/filters.go @@ -30,8 +30,12 @@ func (b *Backend) GetLogsByHeight(height *int64) ([][]*gethcore.Log, error) { return GetLogsFromBlockResults(blockRes) } -// BloomStatus returns the BloomBitsBlocks and the number of processed sections maintained -// by the chain indexer. -func (b *Backend) BloomStatus() (uint64, uint64) { +// BloomStatus returns: +// - bloomBitsBlocks: The number of blocks a single bloom bit section vector +// contains on the server side. +// - bloomSections: The number of processed sections maintained by the indexer. +func (b *Backend) BloomStatus() ( + bloomBitBlocks, bloomSections uint64, +) { return 4096, 0 } diff --git a/eth/rpc/backend/tx_info.go b/eth/rpc/backend/tx_info.go index 48bbbf2bf..2f5003d0b 100644 --- a/eth/rpc/backend/tx_info.go +++ b/eth/rpc/backend/tx_info.go @@ -22,11 +22,11 @@ import ( "github.com/NibiruChain/nibiru/v2/x/evm" ) -// GetTransactionByHash returns the Ethereum format transaction identified by Ethereum transaction hash +// GetTransactionByHash returns the Ethereum format transaction identified by +// Ethereum transaction hash. If the transaction is not found or has been +// discarded from a pruning node, this resolves to nil. func (b *Backend) GetTransactionByHash(txHash common.Hash) (*rpc.EthTxJsonRPC, error) { res, err := b.GetTxByEthHash(txHash) - hexTx := txHash.Hex() - if err != nil { return b.getTransactionByHashPending(txHash) } @@ -57,7 +57,7 @@ func (b *Backend) GetTransactionByHash(txHash common.Hash) (*rpc.EthTxJsonRPC, e // Fallback to find tx index by iterating all valid eth transactions msgs := b.EthMsgsFromTendermintBlock(block, blockRes) for i := range msgs { - if msgs[i].Hash == hexTx { + if msgs[i].Hash == eth.EthTxHashToString(txHash) { if i > math.MaxInt32 { return nil, errors.New("tx index overflow") } @@ -316,7 +316,7 @@ func (b *Backend) GetTxByEthHash(hash common.Hash) (*eth.TxResult, error) { return txs.GetTxByHash(hash) }) if err != nil { - return nil, errorsmod.Wrapf(err, "GetTxByEthHash %s", hash.Hex()) + return nil, errorsmod.Wrapf(err, "GetTxByEthHash(%s)", hash.Hex()) } return txResult, nil } @@ -352,8 +352,9 @@ func (b *Backend) queryTendermintTxIndexer(query string, txGetter func(*rpc.Pars return nil, errors.New("ethereum tx not found") } txResult := resTxs.Txs[0] - if !rpc.TxSuccessOrExpectedFailure(&txResult.TxResult) { - return nil, errors.New("invalid ethereum tx") + isValidEnough, reason := rpc.TxIsValidEnough(&txResult.TxResult) + if !isValidEnough { + return nil, errors.Errorf("invalid ethereum tx: %s", reason) } var tx sdk.Tx diff --git a/eth/rpc/rpc.go b/eth/rpc/rpc.go index 8bd080c61..ec211a121 100644 --- a/eth/rpc/rpc.go +++ b/eth/rpc/rpc.go @@ -263,8 +263,15 @@ func TxStateDBCommitError(res *abci.ResponseDeliverTx) bool { return strings.Contains(res.Log, ErrStateDBCommit) } -// TxSuccessOrExpectedFailure returns true if the transaction was successful +// TxIsValidEnough returns true if the transaction was successful // or if it failed with an ExceedBlockGasLimit error or TxStateDBCommitError error -func TxSuccessOrExpectedFailure(res *abci.ResponseDeliverTx) bool { - return res.Code == 0 || TxExceedBlockGasLimit(res) || TxStateDBCommitError(res) +func TxIsValidEnough(res *abci.ResponseDeliverTx) (condition bool, reason string) { + if res.Code == 0 { + return true, "tx succeeded" + } else if TxExceedBlockGasLimit(res) { + return true, "tx exceeded block gas limit" + } else if TxStateDBCommitError(res) { + return true, "tx state db commit error" + } + return false, "unexpected failure" } diff --git a/eth/rpc/rpcapi/apis.go b/eth/rpc/rpcapi/apis.go index f3f33ae76..6607f92d2 100644 --- a/eth/rpc/rpcapi/apis.go +++ b/eth/rpc/rpcapi/apis.go @@ -10,7 +10,6 @@ import ( "github.com/NibiruChain/nibiru/v2/eth" "github.com/NibiruChain/nibiru/v2/eth/rpc/backend" "github.com/NibiruChain/nibiru/v2/eth/rpc/rpcapi/debugapi" - "github.com/NibiruChain/nibiru/v2/eth/rpc/rpcapi/filtersapi" rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" ) @@ -61,7 +60,7 @@ func init() { { Namespace: NamespaceEth, Version: apiVersion, - Service: filtersapi.NewImplFiltersAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend), + Service: NewImplFiltersAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend), Public: true, }, } diff --git a/eth/rpc/rpcapi/eth_api_test.go b/eth/rpc/rpcapi/eth_api_test.go index 39e995e87..74f5a8af5 100644 --- a/eth/rpc/rpcapi/eth_api_test.go +++ b/eth/rpc/rpcapi/eth_api_test.go @@ -3,6 +3,7 @@ package rpcapi_test import ( "context" "crypto/ecdsa" + "encoding/json" "fmt" "math/big" "strings" @@ -36,11 +37,11 @@ import ( ) var ( - _ suite.TearDownAllSuite = (*TestSuite)(nil) - _ suite.SetupAllSuite = (*TestSuite)(nil) + _ suite.TearDownAllSuite = (*NodeSuite)(nil) + _ suite.SetupAllSuite = (*NodeSuite)(nil) ) -type TestSuite struct { +type NodeSuite struct { suite.Suite cfg testnetwork.Config network *testnetwork.Network @@ -57,12 +58,13 @@ type TestSuite struct { } func TestSuite_RunAll(t *testing.T) { - suite.Run(t, new(TestSuite)) + suite.Run(t, new(Suite)) + suite.Run(t, new(NodeSuite)) } // SetupSuite runs before every test in the suite. Implements the // "suite.SetupAllSuite" interface. -func (s *TestSuite) SetupSuite() { +func (s *NodeSuite) SetupSuite() { testutil.BeforeIntegrationSuite(s.T()) testapp.EnsureNibiruPrefix() @@ -89,14 +91,14 @@ func (s *TestSuite) SetupSuite() { } // Test_ChainID EVM method: eth_chainId -func (s *TestSuite) Test_ChainID() { +func (s *NodeSuite) Test_ChainID() { ethChainID, err := s.ethClient.ChainID(context.Background()) s.NoError(err) s.Equal(appconst.ETH_CHAIN_ID_DEFAULT, ethChainID.Int64()) } // Test_BlockNumber EVM method: eth_blockNumber -func (s *TestSuite) Test_BlockNumber() { +func (s *NodeSuite) Test_BlockNumber() { networkBlockNumber, err := s.network.LatestHeight() s.NoError(err) @@ -106,7 +108,7 @@ func (s *TestSuite) Test_BlockNumber() { } // Test_BlockByNumber EVM method: eth_getBlockByNumber -func (s *TestSuite) Test_BlockByNumber() { +func (s *NodeSuite) Test_BlockByNumber() { networkBlockNumber, err := s.network.LatestHeight() s.NoError(err) @@ -116,7 +118,7 @@ func (s *TestSuite) Test_BlockByNumber() { } // Test_BalanceAt EVM method: eth_getBalance -func (s *TestSuite) Test_BalanceAt() { +func (s *NodeSuite) Test_BalanceAt() { testAccEthAddr := gethcommon.BytesToAddress(testnetwork.NewAccount(s.network, "new-user")) // New user balance should be 0 @@ -133,7 +135,7 @@ func (s *TestSuite) Test_BalanceAt() { } // Test_StorageAt EVM method: eth_getStorageAt -func (s *TestSuite) Test_StorageAt() { +func (s *NodeSuite) Test_StorageAt() { storage, err := s.ethClient.StorageAt( context.Background(), s.fundedAccEthAddr, gethcommon.Hash{}, nil, ) @@ -143,7 +145,7 @@ func (s *TestSuite) Test_StorageAt() { } // Test_PendingStorageAt EVM method: eth_getStorageAt | pending -func (s *TestSuite) Test_PendingStorageAt() { +func (s *NodeSuite) Test_PendingStorageAt() { storage, err := s.ethClient.PendingStorageAt( context.Background(), s.fundedAccEthAddr, gethcommon.Hash{}, ) @@ -154,7 +156,7 @@ func (s *TestSuite) Test_PendingStorageAt() { } // Test_CodeAt EVM method: eth_getCode -func (s *TestSuite) Test_CodeAt() { +func (s *NodeSuite) Test_CodeAt() { code, err := s.ethClient.CodeAt(context.Background(), s.fundedAccEthAddr, nil) s.NoError(err) @@ -163,7 +165,7 @@ func (s *TestSuite) Test_CodeAt() { } // Test_PendingCodeAt EVM method: eth_getCode -func (s *TestSuite) Test_PendingCodeAt() { +func (s *NodeSuite) Test_PendingCodeAt() { code, err := s.ethClient.PendingCodeAt(context.Background(), s.fundedAccEthAddr) s.NoError(err) @@ -172,7 +174,7 @@ func (s *TestSuite) Test_PendingCodeAt() { } // Test_EstimateGas EVM method: eth_estimateGas -func (s *TestSuite) Test_EstimateGas() { +func (s *NodeSuite) Test_EstimateGas() { testAccEthAddr := gethcommon.BytesToAddress(testnetwork.NewAccount(s.network, "new-user")) gasLimit := uint64(21000) msg := geth.CallMsg{ @@ -196,14 +198,14 @@ func (s *TestSuite) Test_EstimateGas() { } // Test_SuggestGasPrice EVM method: eth_gasPrice -func (s *TestSuite) Test_SuggestGasPrice() { +func (s *NodeSuite) Test_SuggestGasPrice() { // TODO: the backend method is stubbed to 0 _, err := s.ethClient.SuggestGasPrice(context.Background()) s.NoError(err) } // Test_SimpleTransferTransaction EVM method: eth_sendRawTransaction -func (s *TestSuite) Test_SimpleTransferTransaction() { +func (s *NodeSuite) Test_SimpleTransferTransaction() { chainID, err := s.ethClient.ChainID(context.Background()) s.NoError(err) nonce, err := s.ethClient.PendingNonceAt(context.Background(), s.fundedAccEthAddr) @@ -231,13 +233,15 @@ func (s *TestSuite) Test_SimpleTransferTransaction() { grpcConn, err := gosdk.GetGRPCConnection(grpcUrl, true, 5) s.NoError(err) - querier := bank.NewQueryClient(grpcConn) - resp, err := querier.Balance(context.Background(), &bank.QueryBalanceRequest{ - Address: s.fundedAccNibiAddr.String(), - Denom: eth.EthBaseDenom, - }) - s.Require().NoError(err) - s.Equal("105"+strings.Repeat("0", 6), resp.Balance.Amount.String()) + { + querier := bank.NewQueryClient(grpcConn) + resp, err := querier.Balance(context.Background(), &bank.QueryBalanceRequest{ + Address: s.fundedAccNibiAddr.String(), + Denom: eth.EthBaseDenom, + }) + s.Require().NoError(err) + s.Equal("105"+strings.Repeat("0", 6), resp.Balance.Amount.String()) + } s.T().Logf("Sending %d wei to %s", weiToSend, recipientAddr.Hex()) signer := gethcore.LatestSignerForChainID(chainID) @@ -257,7 +261,36 @@ func (s *TestSuite) Test_SimpleTransferTransaction() { s.Require().NoError(err) s.NoError(s.network.WaitForNextBlock()) - senderAmountAfterWei, err := s.ethClient.BalanceAt(context.Background(), s.fundedAccEthAddr, nil) + s.NoError(s.network.WaitForNextBlock()) + s.NoError(s.network.WaitForNextBlock()) + + txReceipt, err := s.ethClient.TransactionReceipt(blankCtx, tx.Hash()) + s.NoError(err) + + s.T().Log("Assert event expectations - successful eth tx") + { + blockHeightOfTx := int64(txReceipt.BlockNumber.Uint64()) + blockOfTx, err := s.val.RPCClient.BlockResults(blankCtx, &blockHeightOfTx) + s.NoError(err) + ethTxEvents := []sdk.Event{} + events := blockOfTx.TxsResults[0].Events + for _, event := range events { + if event.Type == "ethereum_tx" { + ethTxEvents = append(ethTxEvents, + sdk.Event{Type: event.Type, Attributes: event.Attributes}, + ) + } + } + + eventsJson, _ := json.Marshal(events) + s.Require().Equal(len(ethTxEvents), 2, "events: ", eventsJson) + hash0, _ := ethTxEvents[0].GetAttribute(evm.AttributeKeyEthereumTxHash) + hash1, _ := ethTxEvents[1].GetAttribute(evm.AttributeKeyEthereumTxHash) + s.Require().Equal(hash0, hash1) + } + + s.T().Log("Assert balances") + senderBalanceAfterWei, err := s.ethClient.BalanceAt(context.Background(), s.fundedAccEthAddr, nil) s.NoError(err) costOfTx := new(big.Int).Add( @@ -265,7 +298,7 @@ func (s *TestSuite) Test_SimpleTransferTransaction() { new(big.Int).Mul((new(big.Int).SetUint64(params.TxGas)), gasPrice), ) wantSenderBalWei := new(big.Int).Sub(senderBalanceBeforeWei, costOfTx) - s.Equal(wantSenderBalWei.String(), senderAmountAfterWei.String(), "surpising sender balance") + s.Equal(wantSenderBalWei.String(), senderBalanceAfterWei.String(), "surpising sender balance") recipientBalanceAfter, err := s.ethClient.BalanceAt(context.Background(), recipientAddr, nil) s.NoError(err) @@ -275,7 +308,7 @@ func (s *TestSuite) Test_SimpleTransferTransaction() { var blankCtx = context.Background() // Test_SmartContract includes contract deployment, query, execution -func (s *TestSuite) Test_SmartContract() { +func (s *NodeSuite) Test_SmartContract() { chainID, err := s.ethClient.ChainID(context.Background()) s.NoError(err) nonce, err := s.ethClient.NonceAt(context.Background(), s.fundedAccEthAddr, nil) @@ -376,7 +409,6 @@ func (s *TestSuite) Test_SmartContract() { txBz, err := tx.MarshalBinary() s.NoError(err) txHash, err := s.ethAPI.SendRawTransaction(txBz) - // err = s.ethClient.SendTransaction(blankCtx, tx) s.Require().NoError(err) _ = s.network.WaitForNextBlock() @@ -384,12 +416,18 @@ func (s *TestSuite) Test_SmartContract() { s.Require().NoError(err) s.NotNil(txReceipt) - _, err = s.ethAPI.GetTransactionLogs(txHash) + txHashFromReceipt := txReceipt.TxHash + s.Equal(txHash, txHashFromReceipt) + + // TODO: Test eth_getTransactionByHash using a JSON-RPC request at the + // endpoint directly. + tx, _, err = s.ethClient.TransactionByHash(blankCtx, txHash) s.NoError(err) + s.NotNil(tx) } } -func (s *TestSuite) TearDownSuite() { +func (s *NodeSuite) TearDownSuite() { s.T().Log("tearing down integration test suite") s.network.Cleanup() } diff --git a/eth/rpc/rpcapi/filtersapi/api.go b/eth/rpc/rpcapi/eth_filters_api.go similarity index 86% rename from eth/rpc/rpcapi/filtersapi/api.go rename to eth/rpc/rpcapi/eth_filters_api.go index 7d0985433..9a72a5042 100644 --- a/eth/rpc/rpcapi/filtersapi/api.go +++ b/eth/rpc/rpcapi/eth_filters_api.go @@ -1,5 +1,5 @@ // Copyright (c) 2023-2024 Nibi, Inc. -package filtersapi +package rpcapi import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/NibiruChain/nibiru/v2/eth/rpc" + rpcbackend "github.com/NibiruChain/nibiru/v2/eth/rpc/backend" "github.com/cometbft/cometbft/libs/log" @@ -26,33 +27,16 @@ import ( "github.com/NibiruChain/nibiru/v2/x/evm" ) -// IFilterAPI -type IFilterAPI interface { - NewPendingTransactionFilter() gethrpc.ID - NewBlockFilter() gethrpc.ID - NewFilter(criteria filters.FilterCriteria) (gethrpc.ID, error) - GetFilterChanges(id gethrpc.ID) (interface{}, error) - GetFilterLogs(ctx context.Context, id gethrpc.ID) ([]*gethcore.Log, error) - UninstallFilter(id gethrpc.ID) bool - GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*gethcore.Log, error) -} - -// IFilterEthBackend defines the methods requided by the PublicFilterAPI backend -type IFilterEthBackend interface { - GetBlockByNumber(blockNum rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) - HeaderByNumber(blockNum rpc.BlockNumber) (*gethcore.Header, error) - HeaderByHash(blockHash common.Hash) (*gethcore.Header, error) - TendermintBlockByHash(hash common.Hash) (*coretypes.ResultBlock, error) - TendermintBlockResultByNumber(height *int64) (*coretypes.ResultBlockResults, error) - GetLogs(blockHash common.Hash) ([][]*gethcore.Log, error) - GetLogsByHeight(*int64) ([][]*gethcore.Log, error) - BlockBloom(blockRes *coretypes.ResultBlockResults) (gethcore.Bloom, error) - - BloomStatus() (uint64, uint64) - - RPCFilterCap() int32 - RPCLogsCap() int32 - RPCBlockRangeCap() int32 +// FiltersAPI offers support to create and manage filters. This will allow +// external clients to retrieve various information related to the Ethereum +// protocol such as blocks, transactions and logs. +type FiltersAPI struct { + logger log.Logger + clientCtx client.Context + backend *rpcbackend.Backend + events *EventSubscriber + filtersMu sync.Mutex + filters map[gethrpc.ID]*filter } // consider a filter inactive if it has not been polled for within deadlineForInactivity @@ -69,27 +53,20 @@ type filter struct { s *Subscription // associated subscription in event system } -// FiltersAPI offers support to create and manage filters. This will allow -// external clients to retrieve various information related to the Ethereum -// protocol such as blocks, transactions and logs. -type FiltersAPI struct { - logger log.Logger - clientCtx client.Context - backend IFilterEthBackend - events *EventSystem - filtersMu sync.Mutex - filters map[gethrpc.ID]*filter -} - -// NewImplFiltersAPI returns a new PublicFilterAPI instance. -func NewImplFiltersAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend IFilterEthBackend) *FiltersAPI { +// NewImplFiltersAPI returns a new FiltersAPI instance. +func NewImplFiltersAPI( + logger log.Logger, + clientCtx client.Context, + tmWSClient *rpcclient.WSClient, + backend *rpcbackend.Backend, +) *FiltersAPI { logger = logger.With("api", "filter") api := &FiltersAPI{ logger: logger, clientCtx: clientCtx, backend: backend, filters: make(map[gethrpc.ID]*filter), - events: NewEventSystem(logger, tmWSClient), + events: NewEventSubscriber(logger, tmWSClient), } go api.timeoutLoop() @@ -189,7 +166,7 @@ func (api *FiltersAPI) NewPendingTransactionFilter() gethrpc.ID { api.filtersMu.Unlock() } } - }(pendingTxSub.eventCh, pendingTxSub.Err()) + }(pendingTxSub.EventCh, pendingTxSub.Error()) return pendingTxSub.ID() } @@ -198,6 +175,7 @@ func (api *FiltersAPI) NewPendingTransactionFilter() gethrpc.ID { // transaction enters the transaction pool and was signed from one of the // transactions this nodes manages. func (api *FiltersAPI) NewPendingTransactions(ctx context.Context) (*gethrpc.Subscription, error) { + api.logger.Debug("eth_newPendingTransactions") notifier, supported := gethrpc.NotifierFromContext(ctx) if !supported { return &gethrpc.Subscription{}, gethrpc.ErrNotificationsUnsupported @@ -254,7 +232,7 @@ func (api *FiltersAPI) NewPendingTransactions(ctx context.Context) (*gethrpc.Sub return } } - }(pendingTxSub.eventCh) + }(pendingTxSub.EventCh) return rpcSub, err } @@ -311,14 +289,15 @@ func (api *FiltersAPI) NewBlockFilter() gethrpc.ID { return } } - }(headerSub.eventCh, headerSub.Err()) + }(headerSub.EventCh, headerSub.Error()) return headerSub.ID() } -// NewHeads send a notification each time a new (header) block is appended to the -// chain. +// NewHeads send a notification each time a new block (and thus, block header) is +// added to the chain. func (api *FiltersAPI) NewHeads(ctx context.Context) (*gethrpc.Subscription, error) { + api.logger.Debug("eth_newHeads") notifier, supported := gethrpc.NotifierFromContext(ctx) if !supported { return &gethrpc.Subscription{}, gethrpc.ErrNotificationsUnsupported @@ -332,9 +311,17 @@ func (api *FiltersAPI) NewHeads(ctx context.Context) (*gethrpc.Subscription, err return &gethrpc.Subscription{}, err } + // Start go routine to continue executing without blocking while the + // goroutine handles incoming events. + // The routine receives a channel that receives block header events as an + // argument. go func(headersCh <-chan coretypes.ResultEvent) { + // This defer statement ensures the subscription is canceled + // when the goroutine exits. defer cancelSubs() + // Listen for block header events and handle them based on the + // type of event received. for { select { case ev, ok := <-headersCh: @@ -350,8 +337,12 @@ func (api *FiltersAPI) NewHeads(ctx context.Context) (*gethrpc.Subscription, err } var baseFee *big.Int = nil - // TODO: fetch bloom from events - header := rpc.EthHeaderFromTendermint(data.Header, gethcore.Bloom{}, baseFee) + bloom, err := ParseBloomFromEvents(data.ResultEndBlock.Events) + if err != nil { + api.logger.Error("failed to parse bloom from end block events") + return + } + header := rpc.EthHeaderFromTendermint(data.Header, bloom, baseFee) _ = notifier.Notify(rpcSub.ID, header) // #nosec G703 case <-rpcSub.Err(): headersSub.Unsubscribe(api.events) @@ -361,14 +352,18 @@ func (api *FiltersAPI) NewHeads(ctx context.Context) (*gethrpc.Subscription, err return } } - }(headersSub.eventCh) + }(headersSub.EventCh) return rpcSub, err } // Logs creates a subscription that fires for all new log that match the given // filter criteria. -func (api *FiltersAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*gethrpc.Subscription, error) { +// Implements "eth_logs". +func (api *FiltersAPI) Logs( + ctx context.Context, crit filters.FilterCriteria, +) (*gethrpc.Subscription, error) { + api.logger.Debug("eth_logs") notifier, supported := gethrpc.NotifierFromContext(ctx) if !supported { return &gethrpc.Subscription{}, gethrpc.ErrNotificationsUnsupported @@ -427,7 +422,7 @@ func (api *FiltersAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (* return } } - }(logsSub.eventCh) + }(logsSub.EventCh) return rpcSub, err } @@ -446,6 +441,7 @@ func (api *FiltersAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (* // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter func (api *FiltersAPI) NewFilter(criteria filters.FilterCriteria) (gethrpc.ID, error) { + api.logger.Debug("eth_newFilter") api.filtersMu.Lock() defer api.filtersMu.Unlock() @@ -504,26 +500,28 @@ func (api *FiltersAPI) NewFilter(criteria filters.FilterCriteria) (gethrpc.ID, e f.logs = append(f.logs, logs...) } api.filtersMu.Unlock() - case <-logsSub.Err(): + case <-logsSub.Error(): api.filtersMu.Lock() delete(api.filters, filterID) api.filtersMu.Unlock() return } } - }(logsSub.eventCh) + }(logsSub.EventCh) return filterID, err } // GetLogs returns logs matching the given argument that are stored within the state. -// +// This function implements the "eth_getLogs" JSON-RPC service method. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs -func (api *FiltersAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*gethcore.Log, error) { +func (api *FiltersAPI) GetLogs( + ctx context.Context, crit filters.FilterCriteria, +) ([]*gethcore.Log, error) { var filter *Filter if crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.logger, api.backend, crit) + filter = NewBlockFilter(api.logger, *api.backend, crit) } else { // Convert the RPC block numbers into internal representations begin := gethrpc.LatestBlockNumber.Int64() @@ -535,9 +533,15 @@ func (api *FiltersAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) end = crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.logger, api.backend, begin, end, crit.Addresses, crit.Topics) + filter = NewRangeFilter(api.logger, *api.backend, begin, end, crit.Addresses, crit.Topics) } + api.logger.Debug("eth_getLogs", + "from_block", filter.criteria.FromBlock.String(), + "to_block", filter.criteria.ToBlock.String(), + "time", time.Now().UTC(), + ) + // Run the filter and return all the logs logs, err := filter.Logs(ctx, int(api.backend.RPCLogsCap()), int64(api.backend.RPCBlockRangeCap())) if err != nil { @@ -568,8 +572,10 @@ func (api *FiltersAPI) UninstallFilter(id gethrpc.ID) bool { // GetFilterLogs returns the logs for the filter with the given id. // If the filter could not be found an empty array of logs is returned. // +// This function implements the "eth_getFilterLogs" JSON-RPC service method. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs func (api *FiltersAPI) GetFilterLogs(ctx context.Context, id gethrpc.ID) ([]*gethcore.Log, error) { + api.logger.Debug("eth_getFilterLogs") api.filtersMu.Lock() f, found := api.filters[id] api.filtersMu.Unlock() @@ -585,7 +591,7 @@ func (api *FiltersAPI) GetFilterLogs(ctx context.Context, id gethrpc.ID) ([]*get var filter *Filter if f.crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.logger, api.backend, f.crit) + filter = NewBlockFilter(api.logger, *api.backend, f.crit) } else { // Convert the RPC block numbers into internal representations begin := gethrpc.LatestBlockNumber.Int64() @@ -597,7 +603,7 @@ func (api *FiltersAPI) GetFilterLogs(ctx context.Context, id gethrpc.ID) ([]*get end = f.crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.logger, api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + filter = NewRangeFilter(api.logger, *api.backend, begin, end, f.crit.Addresses, f.crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx, int(api.backend.RPCLogsCap()), int64(api.backend.RPCBlockRangeCap())) @@ -607,14 +613,16 @@ func (api *FiltersAPI) GetFilterLogs(ctx context.Context, id gethrpc.ID) ([]*get return returnLogs(logs), nil } -// GetFilterChanges returns the logs for the filter with the given id since -// last time it was called. This can be used for polling. +// GetFilterChanges returns the logs for the filter with the given id since last +// time it was called. This can be used for polling. // // For pending transaction and block filters the result is []common.Hash. // (pending)Log filters return []Log. // +// This function implements the "eth_getFilterChanges" JSON-RPC service method. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges func (api *FiltersAPI) GetFilterChanges(id gethrpc.ID) (interface{}, error) { + api.logger.Debug("eth_getFilterChanges") api.filtersMu.Lock() defer api.filtersMu.Unlock() diff --git a/eth/rpc/rpcapi/event_subscriber.go b/eth/rpc/rpcapi/event_subscriber.go new file mode 100644 index 000000000..7f038dbdc --- /dev/null +++ b/eth/rpc/rpcapi/event_subscriber.go @@ -0,0 +1,327 @@ +// Copyright (c) 2023-2024 Nibi, Inc. +package rpcapi + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/pkg/errors" + + tmjson "github.com/cometbft/cometbft/libs/json" + "github.com/cometbft/cometbft/libs/log" + tmquery "github.com/cometbft/cometbft/libs/pubsub/query" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" + tmtypes "github.com/cometbft/cometbft/types" + + "github.com/ethereum/go-ethereum/common" + gethcore "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + gethrpc "github.com/ethereum/go-ethereum/rpc" + + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/NibiruChain/nibiru/v2/eth/rpc/pubsub" + "github.com/NibiruChain/nibiru/v2/x/evm" +) + +var ( + txEventsQuery = tmtypes.QueryForEvent(tmtypes.EventTx).String() + evmEventsQuery = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", + tmtypes.EventTypeKey, + tmtypes.EventTx, + sdk.EventTypeMessage, + sdk.AttributeKeyModule, evm.ModuleName)).String() + headerEventsQuery = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() +) + +// EventSubscriber creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria using the Tendermint's RPC +// client. +type EventSubscriber struct { + Logger log.Logger + Ctx context.Context + TmWSClient *rpcclient.WSClient + + // light client mode + LightMode bool + + Index FilterIndex + TopicChans map[string]chan<- coretypes.ResultEvent + IndexMux *sync.RWMutex + + // Channels + Install chan *Subscription // install filter for event notification + Uninstall chan *Subscription // remove filter for event notification + EventBus pubsub.EventBus +} + +// NewEventSubscriber creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSubscriber( + logger log.Logger, + tmWSClient *rpcclient.WSClient, +) *EventSubscriber { + index := make(FilterIndex) + for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { + index[i] = make(map[gethrpc.ID]*Subscription) + } + + es := &EventSubscriber{ + Logger: logger, + Ctx: context.Background(), + TmWSClient: tmWSClient, + LightMode: false, + Index: index, + TopicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), + IndexMux: new(sync.RWMutex), + Install: make(chan *Subscription), + Uninstall: make(chan *Subscription), + EventBus: pubsub.NewEventBus(), + } + + go es.EventLoop() + go es.consumeEvents() + return es +} + +// WithContext sets a new context to the EventSystem. This is required to set a timeout context when +// a new filter is intantiated. +func (es *EventSubscriber) WithContext(ctx context.Context) { + es.Ctx = ctx +} + +// subscribe performs a new event subscription to a given Tendermint event. +// The subscription creates a unidirectional receive event channel to receive the ResultEvent. +func (es *EventSubscriber) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) { + var ( + err error + cancelFn context.CancelFunc + ) + + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + existingSubs := es.EventBus.Topics() + for _, topic := range existingSubs { + if topic == sub.Event { + eventCh, unsubFn, err := es.EventBus.Subscribe(sub.Event) + if err != nil { + err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.Event) + return nil, nil, err + } + + sub.EventCh = eventCh + return sub, unsubFn, nil + } + } + + switch sub.Typ { + case filters.LogsSubscription: + err = es.TmWSClient.Subscribe(ctx, sub.Event) + case filters.BlocksSubscription: + err = es.TmWSClient.Subscribe(ctx, sub.Event) + case filters.PendingTransactionsSubscription: + err = es.TmWSClient.Subscribe(ctx, sub.Event) + default: + err = fmt.Errorf("invalid filter subscription type %d", sub.Typ) + } + + if err != nil { + sub.ErrCh <- err + return nil, nil, err + } + + // wrap events in a go routine to prevent blocking + es.Install <- sub + <-sub.Installed + + eventCh, unsubFn, err := es.EventBus.Subscribe(sub.Event) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.Event) + } + + sub.EventCh = eventCh + return sub, unsubFn, nil +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSubscriber) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { + var from, to gethrpc.BlockNumber + if crit.FromBlock == nil { + from = gethrpc.LatestBlockNumber + } else { + from = gethrpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = gethrpc.LatestBlockNumber + } else { + to = gethrpc.BlockNumber(crit.ToBlock.Int64()) + } + + switch { + // only interested in new mined logs, mined logs within a specific block range, or + // logs from a specific block number to new mined blocks + case (from == gethrpc.LatestBlockNumber && to == gethrpc.LatestBlockNumber), + (from >= 0 && to >= 0 && to >= from), + (from >= 0 && to == gethrpc.LatestBlockNumber): + + // Create a subscription that will write all logs matching the + // given criteria to the given logs channel. + sub := &Subscription{ + Id: gethrpc.NewID(), + Typ: filters.LogsSubscription, + Event: evmEventsQuery, + logsCrit: crit, + Created: time.Now().UTC(), + Logs: make(chan []*gethcore.Log), + Installed: make(chan struct{}, 1), + ErrCh: make(chan error, 1), + } + return es.subscribe(sub) + + default: + return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to) + } +} + +// SubscribeNewHeads subscribes to new block headers events. +func (es EventSubscriber) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) { + sub := &Subscription{ + Id: gethrpc.NewID(), + Typ: filters.BlocksSubscription, + Event: headerEventsQuery, + Created: time.Now().UTC(), + Headers: make(chan *gethcore.Header), + Installed: make(chan struct{}, 1), + ErrCh: make(chan error, 1), + } + return es.subscribe(sub) +} + +// SubscribePendingTxs subscribes to new pending transactions events from the mempool. +func (es EventSubscriber) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { + sub := &Subscription{ + Id: gethrpc.NewID(), + Typ: filters.PendingTransactionsSubscription, + Event: txEventsQuery, + Created: time.Now().UTC(), + Hashes: make(chan []common.Hash), + Installed: make(chan struct{}, 1), + ErrCh: make(chan error, 1), + } + return es.subscribe(sub) +} + +type FilterIndex map[filters.Type]map[gethrpc.ID]*Subscription + +// EventLoop (un)installs filters and processes mux events. +func (es *EventSubscriber) EventLoop() { + for { + select { + case f := <-es.Install: + es.IndexMux.Lock() + es.Index[f.Typ][f.Id] = f + ch := make(chan coretypes.ResultEvent) + if err := es.EventBus.AddTopic(f.Event, ch); err != nil { + es.Logger.Error("failed to add event topic to event bus", "topic", f.Event, "error", err.Error()) + } else { + es.TopicChans[f.Event] = ch + } + es.IndexMux.Unlock() + close(f.Installed) + case f := <-es.Uninstall: + es.IndexMux.Lock() + delete(es.Index[f.Typ], f.Id) + + var channelInUse bool + // #nosec G705 + for _, sub := range es.Index[f.Typ] { + if sub.Event == f.Event { + channelInUse = true + break + } + } + + // remove topic only when channel is not used by other subscriptions + if !channelInUse { + if err := es.TmWSClient.Unsubscribe(es.Ctx, f.Event); err != nil { + es.Logger.Error("failed to unsubscribe from query", "query", f.Event, "error", err.Error()) + } + + ch, ok := es.TopicChans[f.Event] + if ok { + es.EventBus.RemoveTopic(f.Event) + close(ch) + delete(es.TopicChans, f.Event) + } + } + + es.IndexMux.Unlock() + close(f.ErrCh) + } + } +} + +func (es *EventSubscriber) consumeEvents() { + for { + for rpcResp := range es.TmWSClient.ResponsesCh { + var ev coretypes.ResultEvent + + if rpcResp.Error != nil { + time.Sleep(5 * time.Second) + continue + } else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil { + es.Logger.Error("failed to JSON unmarshal ResponsesCh result event", "error", err.Error()) + continue + } + + if len(ev.Query) == 0 { + // skip empty responses + continue + } + + es.IndexMux.RLock() + ch, ok := es.TopicChans[ev.Query] + es.IndexMux.RUnlock() + if !ok { + es.Logger.Debug("channel for subscription not found", "topic", ev.Query) + es.Logger.Debug("list of available channels", "channels", es.EventBus.Topics()) + continue + } + + // gracefully handle lagging subscribers + t := time.NewTimer(time.Second) + select { + case <-t.C: + es.Logger.Debug("dropped event during lagging subscription", "topic", ev.Query) + case ch <- ev: + } + } + + time.Sleep(time.Second) + } +} + +func MakeSubscription(id, event string) *Subscription { + return &Subscription{ + Id: gethrpc.ID(id), + Typ: filters.LogsSubscription, + Event: event, + Created: time.Now(), + Logs: make(chan []*gethcore.Log), + Hashes: make(chan []common.Hash), + Headers: make(chan *gethcore.Header), + Installed: make(chan struct{}), + EventCh: make(chan coretypes.ResultEvent), + ErrCh: make(chan error), + } +} diff --git a/eth/rpc/rpcapi/event_subscriber_test.go b/eth/rpc/rpcapi/event_subscriber_test.go new file mode 100644 index 000000000..dc14b092e --- /dev/null +++ b/eth/rpc/rpcapi/event_subscriber_test.go @@ -0,0 +1,127 @@ +package rpcapi_test + +import ( + "context" + "os" + "sync" + "testing" + + abci "github.com/cometbft/cometbft/abci/types" + gogoproto "github.com/cosmos/gogoproto/proto" + "github.com/stretchr/testify/suite" + + "github.com/cometbft/cometbft/libs/log" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + gethcore "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/NibiruChain/nibiru/v2/eth" + "github.com/NibiruChain/nibiru/v2/eth/rpc/pubsub" + "github.com/NibiruChain/nibiru/v2/eth/rpc/rpcapi" + "github.com/NibiruChain/nibiru/v2/x/common/testutil" + "github.com/NibiruChain/nibiru/v2/x/evm" + "github.com/NibiruChain/nibiru/v2/x/evm/evmtest" +) + +type Suite struct { + suite.Suite +} + +func TestEventSubscriber(t *testing.T) { + index := make(rpcapi.FilterIndex) + for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*rpcapi.Subscription) + } + es := &rpcapi.EventSubscriber{ + Logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)), + Ctx: context.Background(), + LightMode: false, + Index: index, + TopicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), + IndexMux: new(sync.RWMutex), + Install: make(chan *rpcapi.Subscription), + Uninstall: make(chan *rpcapi.Subscription), + EventBus: pubsub.NewEventBus(), + } + go es.EventLoop() + + event := "event" + sub := rpcapi.MakeSubscription("1", event) + es.Install <- sub + <-sub.Installed + ch, ok := es.TopicChans[sub.Event] + if !ok { + t.Error("expect topic channel exist") + } + + sub = rpcapi.MakeSubscription("2", event) + es.Install <- sub + <-sub.Installed + newCh, ok := es.TopicChans[sub.Event] + if !ok { + t.Error("expect topic channel exist") + } + + if newCh != ch { + t.Error("expect topic channel unchanged") + } +} + +func (s *Suite) TestParseBloomFromEvents() { + for _, tc := range []struct { + name string + endBlockEvents func() (gethcore.Bloom, []abci.Event) + wantErr string + }{ + { + name: "happy: empty events", + endBlockEvents: func() (gethcore.Bloom, []abci.Event) { + return *new(gethcore.Bloom), []abci.Event{} + }, + wantErr: "", + }, + { + name: "happy: events with bloom included", + endBlockEvents: func() (gethcore.Bloom, []abci.Event) { + deps := evmtest.NewTestDeps() + + // populate valid bloom + bloom := gethcore.Bloom{} + dummyBz := []byte("dummybloom") + copy(bloom[:], dummyBz) + + err := deps.Ctx.EventManager().EmitTypedEvents( + &evm.EventTransfer{}, + &evm.EventBlockBloom{ + Bloom: eth.BloomToHex(bloom), + }, + ) + s.NoError(err, "emitting bloom event failed") + + abciEvents := deps.Ctx.EventManager().ABCIEvents() + + bloomEvent := new(evm.EventBlockBloom) + bloomEventType := gogoproto.MessageName(bloomEvent) + + err = testutil.AssertEventPresent(deps.Ctx.EventManager().Events(), bloomEventType) + s.Require().NoError(err) + + return bloom, abciEvents + }, + wantErr: "", + }, + } { + s.Run(tc.name, func() { + wantBloom, events := tc.endBlockEvents() + bloom, err := rpcapi.ParseBloomFromEvents(events) + + if tc.wantErr != "" { + s.Require().ErrorContains(err, tc.wantErr) + return + } + + s.Require().Equal(wantBloom, bloom) + }) + } +} diff --git a/eth/rpc/rpcapi/filtersapi/utils.go b/eth/rpc/rpcapi/filter_utils.go similarity index 73% rename from eth/rpc/rpcapi/filtersapi/utils.go rename to eth/rpc/rpcapi/filter_utils.go index 47d27de2e..53e7c609d 100644 --- a/eth/rpc/rpcapi/filtersapi/utils.go +++ b/eth/rpc/rpcapi/filter_utils.go @@ -1,11 +1,20 @@ // Copyright (c) 2023-2024 Nibi, Inc. -package filtersapi +package rpcapi import ( "math/big" + "cosmossdk.io/errors" + abci "github.com/cometbft/cometbft/abci/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/ethereum/go-ethereum/common" gethcore "github.com/ethereum/go-ethereum/core/types" + + gogoproto "github.com/cosmos/gogoproto/proto" + + "github.com/NibiruChain/nibiru/v2/eth" + "github.com/NibiruChain/nibiru/v2/x/evm" ) // FilterLogs creates a slice of logs matching the given criteria. @@ -105,3 +114,27 @@ func returnLogs(logs []*gethcore.Log) []*gethcore.Log { } return logs } + +// ParseBloomFromEvents iterates through the slice of events +func ParseBloomFromEvents(events []abci.Event) (bloom gethcore.Bloom, err error) { + bloomEvent := new(evm.EventBlockBloom) + bloomEventType := gogoproto.MessageName(bloomEvent) + for _, event := range events { + if event.Type != bloomEventType { + continue + } + typedProtoEvent, err := sdk.ParseTypedEvent(event) + if err != nil { + return bloom, errors.Wrapf( + err, "failed to parse event of type %s", bloomEventType) + } + bloomEvent, ok := (typedProtoEvent).(*evm.EventBlockBloom) + if !ok { + return bloom, errors.Wrapf( + err, "failed to parse event of type %s", bloomEventType) + } + + return eth.BloomFromHex(bloomEvent.Bloom) + } + return bloom, err +} diff --git a/eth/rpc/rpcapi/filtersapi/filters.go b/eth/rpc/rpcapi/filters.go similarity index 93% rename from eth/rpc/rpcapi/filtersapi/filters.go rename to eth/rpc/rpcapi/filters.go index e49d61e57..850e7fdfd 100644 --- a/eth/rpc/rpcapi/filtersapi/filters.go +++ b/eth/rpc/rpcapi/filters.go @@ -1,5 +1,5 @@ // Copyright (c) 2023-2024 Nibi, Inc. -package filtersapi +package rpcapi import ( "context" @@ -8,7 +8,7 @@ import ( "math/big" "github.com/NibiruChain/nibiru/v2/eth/rpc" - "github.com/NibiruChain/nibiru/v2/eth/rpc/backend" + rpcbackend "github.com/NibiruChain/nibiru/v2/eth/rpc/backend" "github.com/cometbft/cometbft/libs/log" tmrpctypes "github.com/cometbft/cometbft/rpc/core/types" @@ -30,7 +30,7 @@ type BloomIV struct { // Filter can be used to retrieve and filter logs. type Filter struct { logger log.Logger - backend IFilterEthBackend + backend rpcbackend.Backend criteria filters.FilterCriteria bloomFilters [][]BloomIV // Filter the system is matching for @@ -38,14 +38,14 @@ type Filter struct { // NewBlockFilter creates a new filter which directly inspects the contents of // a block to figure out whether it is interesting or not. -func NewBlockFilter(logger log.Logger, backend IFilterEthBackend, criteria filters.FilterCriteria) *Filter { +func NewBlockFilter(logger log.Logger, backend rpcbackend.Backend, criteria filters.FilterCriteria) *Filter { // Create a generic filter and convert it into a block filter return newFilter(logger, backend, criteria, nil) } // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. -func NewRangeFilter(logger log.Logger, backend IFilterEthBackend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +func NewRangeFilter(logger log.Logger, backend rpcbackend.Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -78,7 +78,12 @@ func NewRangeFilter(logger log.Logger, backend IFilterEthBackend, begin, end int } // newFilter returns a new Filter -func newFilter(logger log.Logger, backend IFilterEthBackend, criteria filters.FilterCriteria, bloomFilters [][]BloomIV) *Filter { +func newFilter( + logger log.Logger, + backend rpcbackend.Backend, + criteria filters.FilterCriteria, + bloomFilters [][]BloomIV, +) *Filter { return &Filter{ logger: logger, backend: backend, @@ -187,7 +192,7 @@ func (f *Filter) blockLogs(blockRes *tmrpctypes.ResultBlockResults, bloom gethco return []*gethcore.Log{}, nil } - logsList, err := backend.GetLogsFromBlockResults(blockRes) + logsList, err := rpcbackend.GetLogsFromBlockResults(blockRes) if err != nil { return []*gethcore.Log{}, errors.Wrapf(err, "failed to fetch logs block number %d", blockRes.Height) } diff --git a/eth/rpc/rpcapi/filtersapi/filter_system.go b/eth/rpc/rpcapi/filtersapi/filter_system.go deleted file mode 100644 index d54e1afb7..000000000 --- a/eth/rpc/rpcapi/filtersapi/filter_system.go +++ /dev/null @@ -1,311 +0,0 @@ -// Copyright (c) 2023-2024 Nibi, Inc. -package filtersapi - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/pkg/errors" - - tmjson "github.com/cometbft/cometbft/libs/json" - "github.com/cometbft/cometbft/libs/log" - tmquery "github.com/cometbft/cometbft/libs/pubsub/query" - coretypes "github.com/cometbft/cometbft/rpc/core/types" - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" - tmtypes "github.com/cometbft/cometbft/types" - - "github.com/ethereum/go-ethereum/common" - gethcore "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/filters" - gethrpc "github.com/ethereum/go-ethereum/rpc" - - sdk "github.com/cosmos/cosmos-sdk/types" - - "github.com/NibiruChain/nibiru/v2/eth/rpc/pubsub" - "github.com/NibiruChain/nibiru/v2/x/evm" -) - -var ( - txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() - evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", - tmtypes.EventTypeKey, - tmtypes.EventTx, - sdk.EventTypeMessage, - sdk.AttributeKeyModule, evm.ModuleName)).String() - headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() -) - -// EventSystem creates subscriptions, processes events and broadcasts them to the -// subscription which match the subscription criteria using the Tendermint's RPC client. -type EventSystem struct { - logger log.Logger - ctx context.Context - tmWSClient *rpcclient.WSClient - - // light client mode - lightMode bool - - index filterIndex - topicChans map[string]chan<- coretypes.ResultEvent - indexMux *sync.RWMutex - - // Channels - install chan *Subscription // install filter for event notification - uninstall chan *Subscription // remove filter for event notification - eventBus pubsub.EventBus -} - -// NewEventSystem creates a new manager that listens for event on the given mux, -// parses and filters them. It uses the all map to retrieve filter changes. The -// work loop holds its own index that is used to forward events to filters. -// -// The returned manager has a loop that needs to be stopped with the Stop function -// or by stopping the given mux. -func NewEventSystem(logger log.Logger, tmWSClient *rpcclient.WSClient) *EventSystem { - index := make(filterIndex) - for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { - index[i] = make(map[gethrpc.ID]*Subscription) - } - - es := &EventSystem{ - logger: logger, - ctx: context.Background(), - tmWSClient: tmWSClient, - lightMode: false, - index: index, - topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), - indexMux: new(sync.RWMutex), - install: make(chan *Subscription), - uninstall: make(chan *Subscription), - eventBus: pubsub.NewEventBus(), - } - - go es.eventLoop() - go es.consumeEvents() - return es -} - -// WithContext sets a new context to the EventSystem. This is required to set a timeout context when -// a new filter is intantiated. -func (es *EventSystem) WithContext(ctx context.Context) { - es.ctx = ctx -} - -// subscribe performs a new event subscription to a given Tendermint event. -// The subscription creates a unidirectional receive event channel to receive the ResultEvent. -func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) { - var ( - err error - cancelFn context.CancelFunc - ) - - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - - existingSubs := es.eventBus.Topics() - for _, topic := range existingSubs { - if topic == sub.event { - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) - return nil, nil, err - } - - sub.eventCh = eventCh - return sub, unsubFn, nil - } - } - - switch sub.typ { - case filters.LogsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.BlocksSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.PendingTransactionsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - default: - err = fmt.Errorf("invalid filter subscription type %d", sub.typ) - } - - if err != nil { - sub.err <- err - return nil, nil, err - } - - // wrap events in a go routine to prevent blocking - es.install <- sub - <-sub.installed - - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) - } - - sub.eventCh = eventCh - return sub, unsubFn, nil -} - -// SubscribeLogs creates a subscription that will write all logs matching the -// given criteria to the given logs channel. Default value for the from and to -// block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { - var from, to gethrpc.BlockNumber - if crit.FromBlock == nil { - from = gethrpc.LatestBlockNumber - } else { - from = gethrpc.BlockNumber(crit.FromBlock.Int64()) - } - if crit.ToBlock == nil { - to = gethrpc.LatestBlockNumber - } else { - to = gethrpc.BlockNumber(crit.ToBlock.Int64()) - } - - switch { - // only interested in new mined logs, mined logs within a specific block range, or - // logs from a specific block number to new mined blocks - case (from == gethrpc.LatestBlockNumber && to == gethrpc.LatestBlockNumber), - (from >= 0 && to >= 0 && to >= from), - (from >= 0 && to == gethrpc.LatestBlockNumber): - return es.subscribeLogs(crit) - - default: - return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to) - } -} - -// subscribeLogs creates a subscription that will write all logs matching the -// given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: gethrpc.NewID(), - typ: filters.LogsSubscription, - event: evmEvents, - logsCrit: crit, - created: time.Now().UTC(), - logs: make(chan []*gethcore.Log), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -// SubscribeNewHeads subscribes to new block headers events. -func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: gethrpc.NewID(), - typ: filters.BlocksSubscription, - event: headerEvents, - created: time.Now().UTC(), - headers: make(chan *gethcore.Header), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -// SubscribePendingTxs subscribes to new pending transactions events from the mempool. -func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: gethrpc.NewID(), - typ: filters.PendingTransactionsSubscription, - event: txEvents, - created: time.Now().UTC(), - hashes: make(chan []common.Hash), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -type filterIndex map[filters.Type]map[gethrpc.ID]*Subscription - -// eventLoop (un)installs filters and processes mux events. -func (es *EventSystem) eventLoop() { - for { - select { - case f := <-es.install: - es.indexMux.Lock() - es.index[f.typ][f.id] = f - ch := make(chan coretypes.ResultEvent) - if err := es.eventBus.AddTopic(f.event, ch); err != nil { - es.logger.Error("failed to add event topic to event bus", "topic", f.event, "error", err.Error()) - } else { - es.topicChans[f.event] = ch - } - es.indexMux.Unlock() - close(f.installed) - case f := <-es.uninstall: - es.indexMux.Lock() - delete(es.index[f.typ], f.id) - - var channelInUse bool - // #nosec G705 - for _, sub := range es.index[f.typ] { - if sub.event == f.event { - channelInUse = true - break - } - } - - // remove topic only when channel is not used by other subscriptions - if !channelInUse { - if err := es.tmWSClient.Unsubscribe(es.ctx, f.event); err != nil { - es.logger.Error("failed to unsubscribe from query", "query", f.event, "error", err.Error()) - } - - ch, ok := es.topicChans[f.event] - if ok { - es.eventBus.RemoveTopic(f.event) - close(ch) - delete(es.topicChans, f.event) - } - } - - es.indexMux.Unlock() - close(f.err) - } - } -} - -func (es *EventSystem) consumeEvents() { - for { - for rpcResp := range es.tmWSClient.ResponsesCh { - var ev coretypes.ResultEvent - - if rpcResp.Error != nil { - time.Sleep(5 * time.Second) - continue - } else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil { - es.logger.Error("failed to JSON unmarshal ResponsesCh result event", "error", err.Error()) - continue - } - - if len(ev.Query) == 0 { - // skip empty responses - continue - } - - es.indexMux.RLock() - ch, ok := es.topicChans[ev.Query] - es.indexMux.RUnlock() - if !ok { - es.logger.Debug("channel for subscription not found", "topic", ev.Query) - es.logger.Debug("list of available channels", "channels", es.eventBus.Topics()) - continue - } - - // gracefully handle lagging subscribers - t := time.NewTimer(time.Second) - select { - case <-t.C: - es.logger.Debug("dropped event during lagging subscription", "topic", ev.Query) - case ch <- ev: - } - } - - time.Sleep(time.Second) - } -} diff --git a/eth/rpc/rpcapi/filtersapi/filter_system_test.go b/eth/rpc/rpcapi/filtersapi/filter_system_test.go deleted file mode 100644 index cd887c897..000000000 --- a/eth/rpc/rpcapi/filtersapi/filter_system_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package filtersapi - -import ( - "context" - "os" - "sync" - "testing" - "time" - - "github.com/cometbft/cometbft/libs/log" - coretypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/ethereum/go-ethereum/common" - gethcore "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/rpc" - - "github.com/NibiruChain/nibiru/v2/eth/rpc/pubsub" -) - -func makeSubscription(id, event string) *Subscription { - return &Subscription{ - id: rpc.ID(id), - typ: filters.LogsSubscription, - event: event, - created: time.Now(), - logs: make(chan []*gethcore.Log), - hashes: make(chan []common.Hash), - headers: make(chan *gethcore.Header), - installed: make(chan struct{}), - eventCh: make(chan coretypes.ResultEvent), - err: make(chan error), - } -} - -func TestFilterSystem(t *testing.T) { - index := make(filterIndex) - for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { - index[i] = make(map[rpc.ID]*Subscription) - } - es := &EventSystem{ - logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)), - ctx: context.Background(), - lightMode: false, - index: index, - topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), - indexMux: new(sync.RWMutex), - install: make(chan *Subscription), - uninstall: make(chan *Subscription), - eventBus: pubsub.NewEventBus(), - } - go es.eventLoop() - - event := "event" - sub := makeSubscription("1", event) - es.install <- sub - <-sub.installed - ch, ok := es.topicChans[sub.event] - if !ok { - t.Error("expect topic channel exist") - } - - sub = makeSubscription("2", event) - es.install <- sub - <-sub.installed - newCh, ok := es.topicChans[sub.event] - if !ok { - t.Error("expect topic channel exist") - } - - if newCh != ch { - t.Error("expect topic channel unchanged") - } -} diff --git a/eth/rpc/rpcapi/net_api_test.go b/eth/rpc/rpcapi/net_api_test.go index 2803a7e15..6b54f5094 100644 --- a/eth/rpc/rpcapi/net_api_test.go +++ b/eth/rpc/rpcapi/net_api_test.go @@ -4,7 +4,7 @@ import ( "github.com/NibiruChain/nibiru/v2/app/appconst" ) -func (s *TestSuite) TestNetNamespace() { +func (s *NodeSuite) TestNetNamespace() { api := s.val.EthRpc_NET s.Require().True(api.Listening()) s.EqualValues( diff --git a/eth/rpc/rpcapi/filtersapi/subscription.go b/eth/rpc/rpcapi/subscription.go similarity index 59% rename from eth/rpc/rpcapi/filtersapi/subscription.go rename to eth/rpc/rpcapi/subscription.go index fc2ac0f91..3912216ff 100644 --- a/eth/rpc/rpcapi/filtersapi/subscription.go +++ b/eth/rpc/rpcapi/subscription.go @@ -1,5 +1,5 @@ // Copyright (c) 2023-2024 Nibi, Inc. -package filtersapi +package rpcapi import ( "time" @@ -13,27 +13,28 @@ import ( // Subscription defines a wrapper for the private subscription type Subscription struct { - id rpc.ID - typ filters.Type - event string - created time.Time + Id rpc.ID + Typ filters.Type + Event string + Created time.Time logsCrit filters.FilterCriteria - logs chan []*gethcore.Log - hashes chan []common.Hash - headers chan *gethcore.Header - installed chan struct{} // closed when the filter is installed - eventCh <-chan coretypes.ResultEvent - err chan error + Logs chan []*gethcore.Log + Hashes chan []common.Hash + Headers chan *gethcore.Header + Installed chan struct{} // closed when the filter is installed + // Consensus result event channel + EventCh <-chan coretypes.ResultEvent + ErrCh chan error } // ID returns the underlying subscription RPC identifier. func (s Subscription) ID() rpc.ID { - return s.id + return s.Id } // Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the // subscription error channel if unsubscribe fails. -func (s *Subscription) Unsubscribe(es *EventSystem) { +func (s *Subscription) Unsubscribe(es *EventSubscriber) { go func() { uninstallLoop: for { @@ -42,22 +43,17 @@ func (s *Subscription) Unsubscribe(es *EventSystem) { // filter event channel while the subscription loop is waiting for // this method to return (and thus not reading these events). select { - case es.uninstall <- s: + case es.Uninstall <- s: break uninstallLoop - case <-s.logs: - case <-s.hashes: - case <-s.headers: + case <-s.Logs: + case <-s.Hashes: + case <-s.Headers: } } }() } -// Err returns the error channel -func (s *Subscription) Err() <-chan error { - return s.err -} - -// Event returns the tendermint result event channel -func (s *Subscription) Event() <-chan coretypes.ResultEvent { - return s.eventCh +// Error returns the error channel +func (s *Subscription) Error() <-chan error { + return s.ErrCh } diff --git a/eth/rpc/rpcapi/websockets.go b/eth/rpc/rpcapi/websockets.go index e9447acd8..a1af04dd4 100644 --- a/eth/rpc/rpcapi/websockets.go +++ b/eth/rpc/rpcapi/websockets.go @@ -33,7 +33,6 @@ import ( "github.com/NibiruChain/nibiru/v2/app/server/config" "github.com/NibiruChain/nibiru/v2/eth/rpc" "github.com/NibiruChain/nibiru/v2/eth/rpc/pubsub" - rpcfilters "github.com/NibiruChain/nibiru/v2/eth/rpc/rpcapi/filtersapi" "github.com/NibiruChain/nibiru/v2/x/evm" ) @@ -348,7 +347,7 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { - events *rpcfilters.EventSystem + events *EventSubscriber logger log.Logger clientCtx client.Context } @@ -357,7 +356,7 @@ type pubSubAPI struct { func newPubSubAPI(clientCtx client.Context, logger log.Logger, tmWSClient *rpcclient.WSClient) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: rpcfilters.NewEventSystem(logger, tmWSClient), + events: NewEventSubscriber(logger, tmWSClient), logger: logger, clientCtx: clientCtx, } @@ -397,8 +396,8 @@ func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID gethrpc.ID) (pubsu baseFee := big.NewInt(params.InitialBaseFee) go func() { - headersCh := sub.Event() - errCh := sub.Err() + headersCh := sub.EventCh + errCh := sub.Error() for { select { case event, ok := <-headersCh: @@ -570,8 +569,8 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID gethrpc.ID, extra inte } go func() { - ch := sub.Event() - errCh := sub.Err() + ch := sub.EventCh + errCh := sub.Error() for { select { case event, ok := <-ch: @@ -591,7 +590,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID gethrpc.ID, extra inte return } - logs := rpcfilters.FilterLogs(evm.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + logs := FilterLogs(evm.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) if len(logs) == 0 { continue } @@ -634,8 +633,8 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID gethrpc } go func() { - txsCh := sub.Event() - errCh := sub.Err() + txsCh := sub.EventCh + errCh := sub.Error() for { select { case ev := <-txsCh: diff --git a/eth/stringify.go b/eth/stringify.go new file mode 100644 index 000000000..2a2775213 --- /dev/null +++ b/eth/stringify.go @@ -0,0 +1,35 @@ +package eth + +import ( + "encoding/hex" + fmt "fmt" + + gethcommon "github.com/ethereum/go-ethereum/common" + gethcore "github.com/ethereum/go-ethereum/core/types" +) + +// TmTxHashToString returns the consensus transaction hash as a string. +// Transactions are hex-encoded and capitlized. +// Reference: Tx.String function from comet-bft/types/tx.go +func TmTxHashToString(tmTxHash []byte) string { + return fmt.Sprintf("%X", tmTxHash) +} + +// EthTxHashToString returns the EVM transaction hash as a string. +func EthTxHashToString(hash gethcommon.Hash) string { + return hash.Hex() +} + +// BloomToHex returns the bloom filter as a string. +func BloomToHex(bloom gethcore.Bloom) string { + return BytesToHex(bloom.Bytes()) +} + +// BloomFromHex converts a hex-encoded bloom filter to a gethcore.Bloom. +func BloomFromHex(bloomHex string) (gethcore.Bloom, error) { + bloomBz, err := hex.DecodeString(bloomHex) + if err != nil { + return gethcore.Bloom{}, fmt.Errorf("could not construct bloom: %w", err) + } + return gethcore.BytesToBloom(bloomBz), nil +} diff --git a/eth/stringify_test.go b/eth/stringify_test.go new file mode 100644 index 000000000..4e826126b --- /dev/null +++ b/eth/stringify_test.go @@ -0,0 +1,23 @@ +package eth_test + +import ( + gethcommon "github.com/ethereum/go-ethereum/common" + gethcore "github.com/ethereum/go-ethereum/core/types" + + "github.com/NibiruChain/nibiru/v2/eth" +) + +func (s *Suite) TestStringify() { + testCases := []gethcore.Bloom{ + gethcore.BytesToBloom([]byte("alphanumeric123")), + gethcore.BytesToBloom([]byte{}), + gethcore.BytesToBloom(gethcommon.Big0.Bytes()), + gethcore.BytesToBloom(gethcommon.Big1.Bytes()), + } + for tcIdx, bloom := range testCases { + gotStr := eth.BloomToHex(bloom) + gotBloom, err := eth.BloomFromHex(gotStr) + s.NoError(err) + s.Equalf(bloom, gotBloom, "test case: %d", tcIdx) + } +} diff --git a/x/common/address.go b/x/common/address.go index 67bf644d5..61ca416d3 100644 --- a/x/common/address.go +++ b/x/common/address.go @@ -19,6 +19,7 @@ func StringsToAddrs(strs ...string) []sdk.AccAddress { addr := sdk.MustAccAddressFromBech32(str) addrs = append(addrs, addr) } + return addrs } diff --git a/x/evm/cli/query.go b/x/evm/cli/query.go index 349ee7bb2..545b683d2 100644 --- a/x/evm/cli/query.go +++ b/x/evm/cli/query.go @@ -93,8 +93,6 @@ func CmdQueryAccount() *cobra.Command { } isBech32, err := req.Validate() - fmt.Printf("TODO: UD-DEBUG: req.String(): %v\n", req.String()) - fmt.Printf("TODO: UD-DEBUG: err: %v\n", err) if err != nil { return err } diff --git a/x/evm/evmtest/tx.go b/x/evm/evmtest/tx.go index d75efc860..107a15593 100644 --- a/x/evm/evmtest/tx.go +++ b/x/evm/evmtest/tx.go @@ -184,7 +184,7 @@ func DeployContract( // DeployAndExecuteERC20Transfer deploys contract, executes transfer and returns tx hash func DeployAndExecuteERC20Transfer( deps *TestDeps, t *testing.T, -) (*evm.MsgEthereumTx, []*evm.MsgEthereumTx) { +) (erc20Transfer *evm.MsgEthereumTx, predecessors []*evm.MsgEthereumTx) { // TX 1: Deploy ERC-20 contract deployResp, err := DeployContract(deps, embeds.SmartContract_TestERC20) require.NoError(t, err) @@ -194,7 +194,7 @@ func DeployAndExecuteERC20Transfer( // Contract address is deterministic contractAddress := crypto.CreateAddress(deps.Sender.EthAddr, nonce) deps.App.Commit() - predecessors := []*evm.MsgEthereumTx{ + predecessors = []*evm.MsgEthereumTx{ deployResp.EthTxMsg, } @@ -210,14 +210,14 @@ func DeployAndExecuteERC20Transfer( Nonce: (*hexutil.Uint64)(&nonce), Data: (*hexutil.Bytes)(&input), } - ethTxMsg, err := GenerateAndSignEthTxMsg(txArgs, deps) + erc20Transfer, err = GenerateAndSignEthTxMsg(txArgs, deps) require.NoError(t, err) - resp, err := deps.App.EvmKeeper.EthereumTx(sdk.WrapSDKContext(deps.Ctx), ethTxMsg) + resp, err := deps.App.EvmKeeper.EthereumTx(sdk.WrapSDKContext(deps.Ctx), erc20Transfer) require.NoError(t, err) require.Empty(t, resp.VmError) - return ethTxMsg, predecessors + return erc20Transfer, predecessors } // GenerateAndSignEthTxMsg estimates gas, sets gas limit and sings the tx diff --git a/x/evm/keeper/hooks.go b/x/evm/keeper/hooks.go index 177708d64..f42c5ae1e 100644 --- a/x/evm/keeper/hooks.go +++ b/x/evm/keeper/hooks.go @@ -20,7 +20,7 @@ func (k *Keeper) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.Valida ctx = ctx.WithGasMeter(sdk.NewInfiniteGasMeter()) bloom := gethcoretypes.BytesToBloom(k.EvmState.GetBlockBloomTransient(ctx).Bytes()) _ = ctx.EventManager().EmitTypedEvent(&evm.EventBlockBloom{ - Bloom: eth.BytesToHex(bloom.Bytes()), + Bloom: eth.BloomToHex(bloom), }) // The bloom logic doesn't update the validator set. return []abci.ValidatorUpdate{} diff --git a/x/evm/keeper/msg_server.go b/x/evm/keeper/msg_server.go index f2cb75e95..10fdfb131 100644 --- a/x/evm/keeper/msg_server.go +++ b/x/evm/keeper/msg_server.go @@ -50,6 +50,9 @@ func (k *Keeper) EthereumTx( sdk.NewAttribute(evm.AttributeKeyTxIndex, strconv.FormatUint(k.EvmState.BlockTxIndex.GetOr(ctx, 0), 10)), // add event for eth tx gas used, we can't get it from cosmos tx result when it contains multiple eth tx msgs. sdk.NewAttribute(evm.AttributeKeyTxGasUsed, strconv.FormatUint(resp.GasUsed, 10)), + // TODO: fix: It's odd that each event is emitted twice. Migrate to typed + // events and change EVM indexer to align. + // sdk.NewAttribute("emitted_from", "EthereumTx"), } if len(ctx.TxBytes()) > 0 {