From bf02b2054b71187503b2ca228076581564ca4060 Mon Sep 17 00:00:00 2001 From: Ricardo Olarte Date: Tue, 22 Aug 2023 13:52:30 -0500 Subject: [PATCH] Refactor/event watcher (#647) * feat: add EVM event-watcher support with mongodb * refactor: DB, Watcher, SNS Classes --- event-watcher/.env.sample | 21 +- event-watcher/scripts/backfill.ts | 6 +- event-watcher/scripts/backfillArbitrum.ts | 6 +- event-watcher/scripts/backfillNear.ts | 8 +- event-watcher/src/config/index.ts | 59 ++++ event-watcher/src/consts.ts | 41 ++- event-watcher/src/databases/BaseDB.ts | 47 +++ event-watcher/src/databases/Database.ts | 33 --- event-watcher/src/databases/JsonDB.ts | 54 ++++ event-watcher/src/databases/JsonDatabase.ts | 63 ---- event-watcher/src/databases/MongoDB.ts | 91 ++---- .../src/databases/__tests__/utils.test.ts | 6 +- event-watcher/src/databases/types.ts | 84 ++---- event-watcher/src/databases/utils.ts | 79 +---- event-watcher/src/index.ts | 99 +++---- .../src/services/SNS/AwsSNS/index.ts | 19 +- event-watcher/src/services/SNS/BaseSNS.ts | 8 + event-watcher/src/services/SNS/types.ts | 5 +- event-watcher/src/services/SNS/utils.ts | 18 ++ event-watcher/src/utils/environment.ts | 6 +- event-watcher/src/watchers/AlgorandWatcher.ts | 76 ++--- event-watcher/src/watchers/AptosWatcher.ts | 68 +++-- event-watcher/src/watchers/ArbitrumWatcher.ts | 11 +- event-watcher/src/watchers/BSCWatcher.ts | 3 +- .../watchers/{Watcher.ts => BaseWatcher.ts} | 89 +++--- event-watcher/src/watchers/CosmwasmWatcher.ts | 212 +++++++------- event-watcher/src/watchers/EVMWatcher.ts | 50 +--- .../src/watchers/InjectiveExplorerWatcher.ts | 258 ++++++++-------- event-watcher/src/watchers/MoonbeamWatcher.ts | 5 +- event-watcher/src/watchers/NearWatcher.ts | 150 +++++----- event-watcher/src/watchers/PolygonWatcher.ts | 5 +- event-watcher/src/watchers/SolanaWatcher.ts | 276 +++++++++--------- event-watcher/src/watchers/SuiWatcher.ts | 172 +++++------ .../src/watchers/TerraExplorerWatcher.ts | 246 ++++++++-------- event-watcher/src/watchers/types.ts | 39 +++ event-watcher/src/watchers/utils.ts | 14 +- event-watcher/tsconfig.json | 1 + 37 files changed, 1205 insertions(+), 1223 deletions(-) create mode 100644 event-watcher/src/config/index.ts create mode 100644 event-watcher/src/databases/BaseDB.ts delete mode 100644 event-watcher/src/databases/Database.ts create mode 100644 event-watcher/src/databases/JsonDB.ts delete mode 100644 event-watcher/src/databases/JsonDatabase.ts create mode 100644 event-watcher/src/services/SNS/BaseSNS.ts create mode 100644 event-watcher/src/services/SNS/utils.ts rename event-watcher/src/watchers/{Watcher.ts => BaseWatcher.ts} (52%) create mode 100644 event-watcher/src/watchers/types.ts diff --git a/event-watcher/.env.sample b/event-watcher/.env.sample index 1c773804a..c4c12de6b 100644 --- a/event-watcher/.env.sample +++ b/event-watcher/.env.sample @@ -1,15 +1,28 @@ -LOG_DIR=. +#Log +LOG_DIR= LOG_LEVEL=info -ETH_RPC= +#RPC +ETH_RPC=https://eth.llamarpc.com + +#Database source DB_SOURCE= JSON_DB_FILE=db.json +JSON_LAST_BLOCK_FILE=lastBlockByChain.json + +#Server +PORT=3005 +#MongoDB MONGODB_URI=mongodb://localhost:27017 MONGODB_DATABASE=wormhole +#SNS source +SNS_SOURCE=aws + +#AWS AWS_SNS_REGION= -AWS_TOPIC_ARN= +AWS_SNS_TOPIC_ARN= AWS_SNS_SUBJECT=EventWatcher AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= \ No newline at end of file +AWS_SECRET_ACCESS_KEY= diff --git a/event-watcher/scripts/backfill.ts b/event-watcher/scripts/backfill.ts index 729f3a57a..031cf3afb 100644 --- a/event-watcher/scripts/backfill.ts +++ b/event-watcher/scripts/backfill.ts @@ -3,13 +3,13 @@ dotenv.config(); import { ChainId, coalesceChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { chunkArray, sleep } from '../src/common'; import { BigtableDatabase } from '../src/databases/BigtableDatabase'; -import { JsonDatabase } from '../src/databases/JsonDatabase'; import { VaasByBlock } from '../src/databases/types'; +import JsonDB from '../src/databases/JsonDB'; // This script backfills the bigtable db from a json db (async () => { - const localDb = new JsonDatabase(); + const localDb = new JsonDB(); const remoteDb = new BigtableDatabase(); const dbEntries = Object.entries(localDb.db); @@ -25,7 +25,7 @@ import { VaasByBlock } from '../src/databases/types'; }, {}); await remoteDb.storeVaasByBlock( coalesceChainName(Number(chain) as ChainId), - chunkedVaasByBlock + chunkedVaasByBlock, ); await sleep(500); } diff --git a/event-watcher/scripts/backfillArbitrum.ts b/event-watcher/scripts/backfillArbitrum.ts index 15aaf1b37..bdbb85c41 100644 --- a/event-watcher/scripts/backfillArbitrum.ts +++ b/event-watcher/scripts/backfillArbitrum.ts @@ -3,7 +3,7 @@ dotenv.config(); import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import axios from 'axios'; import ora from 'ora'; -import { initDb } from '../src/databases/utils'; +import { getDB } from '../src/databases/utils'; import { AXIOS_CONFIG_JSON } from '../src/consts'; import { ArbitrumWatcher } from '../src/watchers/ArbitrumWatcher'; import { LOG_MESSAGE_PUBLISHED_TOPIC } from '../src/watchers/EVMWatcher'; @@ -11,14 +11,14 @@ import { LOG_MESSAGE_PUBLISHED_TOPIC } from '../src/watchers/EVMWatcher'; // This script exists because the Arbitrum RPC node only supports a 10 block range which is super slow (async () => { - const db = initDb(); + const db = getDB(); const chain: ChainName = 'arbitrum'; const endpoint = `https://api.arbiscan.io/api?module=logs&action=getLogs&address=${CONTRACTS.MAINNET.arbitrum.core}&topic0=${LOG_MESSAGE_PUBLISHED_TOPIC}&apikey=YourApiKeyToken`; // fetch all message publish logs for core bridge contract from explorer let log = ora('Fetching logs from Arbiscan...').start(); const blockNumbers = (await axios.get(endpoint, AXIOS_CONFIG_JSON)).data.result.map((x: any) => - parseInt(x.blockNumber, 16) + parseInt(x.blockNumber, 16), ); log.succeed(`Fetched ${blockNumbers.length} logs from Arbiscan`); // use the watcher to fetch corresponding blocks diff --git a/event-watcher/scripts/backfillNear.ts b/event-watcher/scripts/backfillNear.ts index 8d602275c..0dc50e982 100644 --- a/event-watcher/scripts/backfillNear.ts +++ b/event-watcher/scripts/backfillNear.ts @@ -4,7 +4,7 @@ import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/cons import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../src/common'; import { BlockResult } from 'near-api-js/lib/providers/provider'; import ora from 'ora'; -import { initDb } from '../src/databases/utils'; +import { getDB } from '../src/databases/utils'; import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../src/utils/near'; import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher'; @@ -19,11 +19,11 @@ import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher'; const BATCH_SIZE = 1000; (async () => { - const db = initDb(); + const db = getDB(); const chain: ChainName = 'near'; const provider = await getNearProvider(NEAR_ARCHIVE_RPC); const fromBlock = Number( - (await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0 + (await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0, ); // fetch all transactions for core bridge contract from explorer @@ -32,7 +32,7 @@ const BATCH_SIZE = 1000; const transactions = await getTransactionsByAccountId( CONTRACTS.MAINNET.near.core, BATCH_SIZE, - toBlock.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds + toBlock.header.timestamp.toString().padEnd(19, '9'), // pad to nanoseconds ); log.succeed(`Fetched ${transactions.length} transactions from NEAR Explorer`); diff --git a/event-watcher/src/config/index.ts b/event-watcher/src/config/index.ts new file mode 100644 index 000000000..5d34495c4 --- /dev/null +++ b/event-watcher/src/config/index.ts @@ -0,0 +1,59 @@ +import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; + +export const env = { + NODE_ENV: process.env.NODE_ENV, + + LOG_DIR: process.env.LOG_DIR, + LOG_LEVEL: process.env.LOG_LEVEL || 'info', + + ETH_RPC: process.env.ETH_RPC, + + DB_SOURCE: process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local', + JSON_DB_FILE: process.env.JSON_DB_FILE || './db.json', + JSON_LAST_BLOCK_FILE: process.env.JSON_LAST_BLOCK_FILE || './lastBlockByChain.json', + + PORT: process.env.PORT, + + MONGODB_URI: process.env.MONGODB_URI, + MONGODB_DATABASE: process.env.MONGODB_DATABASE, + + SNS_SOURCE: process.env.SNS_SOURCE, + AWS_SNS_REGION: process.env.AWS_SNS_REGION, + AWS_SNS_TOPIC_ARN: process.env.AWS_SNS_TOPIC_ARN, + AWS_SNS_SUBJECT: process.env.AWS_SNS_SUBJECT, + AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY, +} as const; + +// EVM Chains not supported +// aurora, gnosis, neon, sepolia + +export const evmChains: EVMChainName[] = [ + 'acala', + 'arbitrum', + 'avalanche', + 'base', + 'bsc', + 'celo', + 'ethereum', + 'fantom', + 'karura', + 'klaytn', + 'moonbeam', + 'oasis', + 'optimism', + 'polygon', +]; + +export const supportedChains: ChainName[] = [ + ...evmChains, + 'algorand', + 'aptos', + 'injective', + 'near', + 'solana', + 'sui', + 'terra', + 'terra2', + 'xpla', +]; diff --git a/event-watcher/src/consts.ts b/event-watcher/src/consts.ts index 658547c98..1d8c8f99f 100644 --- a/event-watcher/src/consts.ts +++ b/event-watcher/src/consts.ts @@ -1,5 +1,6 @@ import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { AxiosRequestConfig } from 'axios'; +import { env } from './config'; export const TIMEOUT = 0.5 * 1000; @@ -25,37 +26,36 @@ export const TIMEOUT = 0.5 * 1000; // This node didn't work: 'https://arb1.arbitrum.io/rpc', export const RPCS_BY_CHAIN: { [key in ChainName]?: string } = { - ethereum: process.env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native', - bsc: 'https://bsc-dataseed2.defibit.io', - polygon: 'https://rpc.ankr.com/polygon', - avalanche: 'https://rpc.ankr.com/avalanche', - oasis: 'https://emerald.oasis.dev', + acala: 'https://eth-rpc-acala.aca-api.network', algorand: 'https://mainnet-api.algonode.cloud', + aptos: 'https://fullnode.mainnet.aptoslabs.com/', + arbitrum: 'https://arb1.arbitrum.io/rpc', + avalanche: 'https://rpc.ankr.com/avalanche', + base: 'https://developer-access-mainnet.base.org', + bsc: 'https://bsc-dataseed2.defibit.io', + celo: 'https://forno.celo.org', + ethereum: env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native', fantom: 'https://rpc.ankr.com/fantom', + injective: 'https://api.injective.network', karura: 'https://eth-rpc-karura.aca-api.network', - acala: 'https://eth-rpc-acala.aca-api.network', klaytn: 'https://klaytn-mainnet-rpc.allthatnode.com:8551', - celo: 'https://forno.celo.org', moonbeam: 'https://rpc.ankr.com/moonbeam', - arbitrum: 'https://arb1.arbitrum.io/rpc', - optimism: 'https://rpc.ankr.com/optimism', - aptos: 'https://fullnode.mainnet.aptoslabs.com/', near: 'https://rpc.mainnet.near.org', - xpla: 'https://dimension-lcd.xpla.dev', - terra2: 'https://phoenix-lcd.terra.dev', - // terra: 'https://columbus-fcd.terra.dev', - terra: 'https://terra-classic-fcd.publicnode.com', - injective: 'https://api.injective.network', - solana: process.env.SOLANA_RPC ?? 'https://api.mainnet-beta.solana.com', + oasis: 'https://emerald.oasis.dev', + optimism: 'https://rpc.ankr.com/optimism', + polygon: 'https://rpc.ankr.com/polygon', + solana: 'https://api.mainnet-beta.solana.com', sui: 'https://rpc.mainnet.sui.io', - base: 'https://developer-access-mainnet.base.org', + terra: 'https://terra-classic-fcd.publicnode.com', // 'https://columbus-fcd.terra.dev', + terra2: 'https://phoenix-lcd.terra.dev', + xpla: 'https://dimension-lcd.xpla.dev', }; // Separating for now so if we max out infura we can keep Polygon going export const POLYGON_ROOT_CHAIN_RPC = 'https://rpc.ankr.com/eth'; export const POLYGON_ROOT_CHAIN_ADDRESS = '0x86E4Dc95c7FBdBf52e33D563BbDB00823894C287'; // Optimism watcher relies on finalized calls which don't work right on Ankr -export const OPTIMISM_CTC_CHAIN_RPC = process.env.ETH_RPC; +export const OPTIMISM_CTC_CHAIN_RPC = env.ETH_RPC; export const OPTIMISM_CTC_CHAIN_ADDRESS = '0x5E4e65926BA27467555EB562121fac00D24E9dD2'; export const ALGORAND_INFO = { @@ -68,11 +68,6 @@ export const ALGORAND_INFO = { token: '', }; -export const DB_SOURCE = - process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local'; -export const JSON_DB_FILE = process.env.JSON_DB_FILE || './db.json'; -export const DB_LAST_BLOCK_FILE = process.env.DB_LAST_BLOCK_FILE || './lastBlockByChain.json'; - // without this, axios request will error `Z_BUF_ERROR`: https://github.com/axios/axios/issues/5346 export const AXIOS_CONFIG_JSON: AxiosRequestConfig = { headers: { diff --git a/event-watcher/src/databases/BaseDB.ts b/event-watcher/src/databases/BaseDB.ts new file mode 100644 index 000000000..86ae9ee49 --- /dev/null +++ b/event-watcher/src/databases/BaseDB.ts @@ -0,0 +1,47 @@ +import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../common/consts'; +import { getLogger, WormholeLogger } from '../utils/logger'; +import { DBImplementation, LastBlockByChain, VaaLog } from './types'; +abstract class BaseDB implements DBImplementation { + public logger: WormholeLogger; + public lastBlockByChain: LastBlockByChain = {}; + + constructor() { + this.logger = getLogger('db'); + this.lastBlockByChain = {}; + } + + public async start(): Promise { + await this.connect(); + await this.getLastBlocksProcessed(); + console.log('----------DB CONFIGURED------------'); + } + + public async getResumeBlockByChain(chain: ChainName): Promise { + const lastBlock = this.getLastBlockByChain(chain); + const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain]; + + if (lastBlock) return Number(lastBlock) + 1; + if (initialBlock) return Number(initialBlock); + return null; + } + + public getLastBlockByChain(chain: ChainName): string | null { + const chainId = coalesceChainId(chain); + const blockInfo = this.lastBlockByChain?.[chainId]; + + if (blockInfo) { + const tokens = String(blockInfo)?.split('/'); + return chain === 'aptos' ? tokens.at(-1)! : tokens[0]; + } + + return null; + } + + abstract connect(): Promise; + abstract getLastBlocksProcessed(): Promise; + abstract storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise; + abstract storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise; +} + +export default BaseDB; diff --git a/event-watcher/src/databases/Database.ts b/event-watcher/src/databases/Database.ts deleted file mode 100644 index 09e9ebfee..000000000 --- a/event-watcher/src/databases/Database.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { getLogger, WormholeLogger } from '../utils/logger'; -import { VaaLog, VaasByBlock } from './types'; - -export class Database { - logger: WormholeLogger; - constructor() { - this.logger = getLogger('db'); - } - static filterEmptyBlocks(vaasByBlock: VaasByBlock): VaasByBlock { - const filteredVaasByBlock: VaasByBlock = {}; - for (const [block, vaas] of Object.entries(vaasByBlock)) { - if (vaas.length > 0) filteredVaasByBlock[block] = [...vaas]; - } - return filteredVaasByBlock; - } - - async getLastBlockByChain(chain: ChainName): Promise { - throw new Error('Not Implemented'); - } - - async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise { - throw new Error('Not Implemented'); - } - - async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise { - throw new Error('Not Implemented'); - } - - async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise { - throw new Error('Not Implemented'); - } -} diff --git a/event-watcher/src/databases/JsonDB.ts b/event-watcher/src/databases/JsonDB.ts new file mode 100644 index 000000000..f78bb3a20 --- /dev/null +++ b/event-watcher/src/databases/JsonDB.ts @@ -0,0 +1,54 @@ +import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import { readFileSync, writeFileSync } from 'fs'; +import { env } from '../config'; +import BaseDB from './BaseDB'; +import { VaaLog } from './types'; + +const ENCODING = 'utf8'; + +export default class JsonDB extends BaseDB { + db: {} | null = null; + dbFile: string; + dbLastBlockFile: string; + + constructor() { + super(); + this.db = {}; + this.lastBlockByChain = {}; + this.dbFile = env.JSON_DB_FILE; + this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE; + } + + async connect(): Promise { + try { + const rawDb = readFileSync(this.dbFile, ENCODING); + this.db = rawDb ? JSON.parse(rawDb) : {}; + console.log('---CONNECTED TO JsonDB---'); + } catch (e) { + this.logger.warn(`${this.dbFile} does not exists, creating new file`); + this.db = {}; + } + } + + async getLastBlocksProcessed(): Promise { + try { + const rawLastBlockByChain = readFileSync(this.dbLastBlockFile, ENCODING); + this.lastBlockByChain = rawLastBlockByChain ? JSON.parse(rawLastBlockByChain) : {}; + } catch (e) { + this.logger.warn(`${this.dbLastBlockFile} does not exists, creating new file`); + this.lastBlockByChain = {}; + } + } + + override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise { + this.db = [{ ...this.db, ...vaaLogs }]; + writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING); + } + + override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise { + const chainId = coalesceChainId(chain); + this.lastBlockByChain[chainId] = String(lastBlock); + + writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain, null, 2), ENCODING); + } +} diff --git a/event-watcher/src/databases/JsonDatabase.ts b/event-watcher/src/databases/JsonDatabase.ts deleted file mode 100644 index f3712dad4..000000000 --- a/event-watcher/src/databases/JsonDatabase.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { readFileSync, writeFileSync } from 'fs'; -import { DB_LAST_BLOCK_FILE, JSON_DB_FILE } from '../consts'; -import { Database } from './Database'; -import { DB, LastBlockByChain, VaasByBlock } from './types'; - -const ENCODING = 'utf8'; -export class JsonDatabase extends Database { - db: DB; - lastBlockByChain: LastBlockByChain; - dbFile: string; - dbLastBlockFile: string; - constructor() { - super(); - this.db = {}; - this.lastBlockByChain = {}; - if (!process.env.JSON_DB_FILE) { - this.logger.info(`no db file set, using default path=${JSON_DB_FILE}`); - } - if (!process.env.DB_LAST_BLOCK_FILE) { - this.logger.info(`no db file set, using default path=${DB_LAST_BLOCK_FILE}`); - } - this.dbFile = JSON_DB_FILE; - this.dbLastBlockFile = DB_LAST_BLOCK_FILE; - - try { - const rawDb = readFileSync(this.dbFile, ENCODING); - this.db = JSON.parse(rawDb); - const rawLast = readFileSync(this.dbLastBlockFile, ENCODING); - this.lastBlockByChain = JSON.parse(rawLast); - } catch (e) { - this.logger.warn('Failed to load DB, initiating a fresh one.'); - this.db = {}; - } - } - - async getLastBlockByChain(chain: ChainName): Promise { - const chainId = coalesceChainId(chain); - const blockInfo = this.lastBlockByChain[chainId]; - if (blockInfo) { - const tokens = blockInfo.split('/'); - return chain === 'aptos' ? tokens.at(-1)! : tokens[0]; - } - return null; - } - async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise { - const chainId = coalesceChainId(chain); - const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock); - if (Object.keys(filteredVaasByBlock).length) { - this.db[chainId] = { ...(this.db[chainId] || {}), ...filteredVaasByBlock }; - writeFileSync(this.dbFile, JSON.stringify(this.db), ENCODING); - } - - // this will always overwrite the "last" block, so take caution if manually backfilling gaps - const blockKeys = Object.keys(vaasByBlock).sort( - (bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0]) - ); - if (blockKeys.length) { - this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1]; - writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain), ENCODING); - } - } -} diff --git a/event-watcher/src/databases/MongoDB.ts b/event-watcher/src/databases/MongoDB.ts index d2826189c..77c462e28 100644 --- a/event-watcher/src/databases/MongoDB.ts +++ b/event-watcher/src/databases/MongoDB.ts @@ -1,85 +1,50 @@ import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { Database } from './Database'; -import { LastBlockByChain, VaaLog, VaasByBlock } from './types'; +import BaseDB from './BaseDB'; +import { VaaLog } from './types'; import * as mongoDB from 'mongodb'; +import { env } from '../config'; const WORMHOLE_TX_COLLECTION: string = 'wormholeTx'; const WORMHOLE_LAST_BLOCK_COLLECTION: string = 'lastBlockByChain'; -export class MongoDatabase extends Database { + +export default class MongoDB extends BaseDB { private client: mongoDB.MongoClient | null = null; private db: mongoDB.Db | null = null; - private wormholeTx: mongoDB.Collection | null = null; - private lastTxBlockByChain: mongoDB.Collection | null = null; - private lastBlockByChain: LastBlockByChain | null = null; + private wormholeTxCollection: mongoDB.Collection | null = null; + private lastTxBlockByChainCollection: mongoDB.Collection | null = null; constructor() { super(); + } - this.lastBlockByChain = null; - + async connect(): Promise { try { - this.client = new mongoDB.MongoClient(process.env.MONGODB_URI as string); - this.connectDB(); - this.db = this.client.db(process.env.MONGODB_DATABASE ?? 'wormhole'); - this.wormholeTx = this.db.collection(WORMHOLE_TX_COLLECTION); - this.lastTxBlockByChain = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION); + this.client = new mongoDB.MongoClient(env.MONGODB_URI as string); + this.db = this.client.db(env.MONGODB_DATABASE ?? 'wormhole'); + this.wormholeTxCollection = this.db.collection(WORMHOLE_TX_COLLECTION); + this.lastTxBlockByChainCollection = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION); + await this.client?.connect(); + + console.log('---CONNECTED TO MongoDB---'); } catch (e) { throw new Error(`(MongoDB) Error: ${e}`); } } - async connectDB() { - await this.client?.connect(); - console.log(`Successfully connected to database: ${this.db?.databaseName} `); - } - - async getLastBlockByChainFromDB() { - const latestBlocks = await this.lastTxBlockByChain?.findOne({}); + async getLastBlocksProcessed(): Promise { + const latestBlocks = await this.lastTxBlockByChainCollection?.findOne({}); const json = JSON.parse(JSON.stringify(latestBlocks)); - this.lastBlockByChain = json; - } - - async getLastBlockByChain(chain: ChainName): Promise { - if (!this.lastBlockByChain) await this.getLastBlockByChainFromDB(); - - const chainId = coalesceChainId(chain); - const blockInfo: string | undefined = this.lastBlockByChain?.[chainId]; - - if (blockInfo) { - const tokens = String(blockInfo)?.split('/'); - return chain === 'aptos' ? tokens.at(-1)! : tokens[0]; - } - - return null; + this.lastBlockByChain = json || {}; } - async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise { - // const chainId = coalesceChainId(chain); - // const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock); - // if (Object.keys(filteredVaasByBlock).length) { - // } - // this will always overwrite the "last" block, so take caution if manually backfilling gaps - // const blockKeys = Object.keys(vaasByBlock).sort( - // (bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0]), - // ); - // if (blockKeys.length) { - // this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1]; - // await this.wormholeTx.insertOne({ - // chainId: chainId, - // block: this.lastBlockByChain[chainId], - // data: vaasByBlock, - // }); - // } + override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise { + await this.wormholeTxCollection?.insertMany(vaaLogs); } - async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise { - await this.wormholeTx?.insertMany(vaaLogs); - } - - async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise { + override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise { const chainId = coalesceChainId(chain); - await this.lastTxBlockByChain?.findOneAndUpdate( + await this.lastTxBlockByChainCollection?.findOneAndUpdate( {}, { $set: { @@ -92,14 +57,4 @@ export class MongoDatabase extends Database { }, ); } - - async storeVaa(chain: ChainName, txHash: string, vaa_id: string, payload: string): Promise { - const chainId = coalesceChainId(chain); - this.wormholeTx?.insertOne({ - chainId: chainId, - txHash: txHash, - vaa_id: vaa_id, - payload: payload, - }); - } } diff --git a/event-watcher/src/databases/__tests__/utils.test.ts b/event-watcher/src/databases/__tests__/utils.test.ts index 38fbae54f..280151e9a 100644 --- a/event-watcher/src/databases/__tests__/utils.test.ts +++ b/event-watcher/src/databases/__tests__/utils.test.ts @@ -2,10 +2,10 @@ import { CHAIN_ID_SOLANA } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { expect, test } from '@jest/globals'; import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../../common'; import { JsonDatabase } from '../JsonDatabase'; -import { getResumeBlockByChain, initDb, makeBlockKey } from '../utils'; +import { getResumeBlockByChain, getDB, makeBlockKey } from '../utils'; test('getResumeBlockByChain', async () => { - const db = initDb() as JsonDatabase; + const db = getDB() as JsonDatabase; const fauxBlock = '98765'; const blockKey = makeBlockKey(fauxBlock, new Date().toISOString()); db.lastBlockByChain = { [CHAIN_ID_SOLANA]: blockKey }; @@ -15,7 +15,7 @@ test('getResumeBlockByChain', async () => { // if a chain is not in the database, the initial deployment block should be returned expect(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam).toBeDefined(); expect(await getResumeBlockByChain('moonbeam')).toEqual( - Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam) + Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam), ); // if neither, null should be returned expect(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.unset).toBeUndefined(); diff --git a/event-watcher/src/databases/types.ts b/event-watcher/src/databases/types.ts index da43a84e9..232032d16 100644 --- a/event-watcher/src/databases/types.ts +++ b/event-watcher/src/databases/types.ts @@ -1,7 +1,20 @@ -import { ChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { Row } from '@google-cloud/bigtable'; -export type VaaLog = { - id: string; +import { ChainId, ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import JsonDB from './JsonDB'; +import MongoDB from './MongoDB'; + +export type DBOptionTypes = MongoDB | JsonDB; +export interface DBImplementation { + start(): Promise; + connect(): Promise; + getResumeBlockByChain(chain: ChainName): Promise; + getLastBlocksProcessed(): Promise; + getLastBlockByChain(chain: ChainName): string | null; + storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise; + storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise; +} + +export interface VaaLog { + vaaId: string; chainId: number; chainName: string; emitter: string; @@ -10,62 +23,9 @@ export type VaaLog = { sender: string; payload: any; blockNumber: number; -}; -export type VaasByBlock = { [blockInfo: string]: string[] }; -export type DB = { [chain in ChainId]?: VaasByBlock }; -export type LastBlockByChain = { [chain in ChainId]?: string }; -export type JSONArray = string; -export type BigtableMessagesRow = { - key: string; - data: { - // column family - info: { - // columns - timestamp?: { value: string; timestamp: string }; - txHash?: { value: string; timestamp: string }; - hasSignedVaa?: { value: number; timestamp: string }; - }; - }; -}; -export interface BigtableSignedVAAsRow { - key: string; - data: { - // column family - info: { - // columns - bytes: { value: Buffer; timestamp: string }; - }; - }; -} -export interface BigtableVAAsByTxHashRow { - key: string; - data: { - // column family - info: { - // columns - vaaKeys: { value: JSONArray; timestamp: string }; - }; - }; -} -export interface BigtableMessagesResultRow extends Row { - key: string; - data: { - // column family - info: { - // columns - timestamp?: [{ value: string; timestamp: string }]; - txHash?: [{ value: string; timestamp: string }]; - hasSignedVaa?: [{ value: number; timestamp: string }]; - }; - }; -} -export interface BigtableSignedVAAsResultRow extends Row { - key: string; - data: { - // column family - info: { - // columns - bytes: [{ value: Buffer; timestamp: string }]; - }; - }; + indexedAt?: string | number; + updatedAt?: string | number; + createdAt?: string | number; } + +export type LastBlockByChain = { [chain in ChainId]?: string }; diff --git a/event-watcher/src/databases/utils.ts b/event-watcher/src/databases/utils.ts index 089571a9b..d602aac5a 100644 --- a/event-watcher/src/databases/utils.ts +++ b/event-watcher/src/databases/utils.ts @@ -1,15 +1,19 @@ import { ChainId, ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, MAX_UINT_64, padUint16, padUint64 } from '../common'; -import { DB_SOURCE } from '../consts'; -import { Database } from './Database'; -import { JsonDatabase } from './JsonDatabase'; -import { VaaLog, VaasByBlock } from './types'; -import { MongoDatabase } from './MongoDB'; +import { MAX_UINT_64, padUint16, padUint64 } from '../common'; +import JsonDB from './JsonDB'; +import MongoDB from './MongoDB'; +import { env } from '../config'; +import { DBOptionTypes } from './types'; // Bigtable Message ID format // chain/MAX_UINT64-block/emitter/sequence // 00002/00000000000013140651/0000000000000000000000008ea8874192c8c715e620845f833f48f39b24e222/00000000000000000000 +export const getDB = (): DBOptionTypes => { + if (env.DB_SOURCE === 'mongo') return new MongoDB(); + return new JsonDB(); +}; + export function makeMessageId( chainId: number, block: string, @@ -45,66 +49,3 @@ export const makeVaaKey = ( emitter: string, seq: string, ): string => `${transactionHash}:${coalesceChainId(chain)}/${emitter}/${seq}`; - -// make a bigtable row key for the `vaasByTxHash` table -export const makeVAAsByTxHashRowKey = (txHash: string, chain: number): string => - `${txHash}/${padUint16(chain.toString())}`; - -// make a bigtable row key for the `signedVAAs` table -export const makeSignedVAAsRowKey = (chain: number, emitter: string, sequence: string): string => - `${padUint16(chain.toString())}/${emitter}/${padUint64(sequence)}`; - -let database: Database = new Database(); -export const initDb = (): Database => { - if (DB_SOURCE === 'mongo') { - database = new MongoDatabase(); - } else { - database = new JsonDatabase(); - } - return database; -}; - -export const getResumeBlockByChain = async (chain: ChainName): Promise => { - const lastBlock = await database.getLastBlockByChain(chain); - const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain]; - - return lastBlock !== null - ? Number(lastBlock) + 1 - : initialBlock !== undefined - ? Number(initialBlock) - : null; -}; - -export const storeVaasByBlock = async ( - chain: ChainName, - vaasByBlock: VaasByBlock, -): Promise => { - return database.storeVaasByBlock(chain, vaasByBlock); -}; - -export const storeVaaLogs = (chain: ChainName, vaaLogs: VaaLog[]): Promise => { - return database.storeVaaLogs(chain, vaaLogs); -}; - -export const storeLatestProcessBlock = (chain: ChainName, lastBlock: number): Promise => { - return database.storeLatestProcessBlock(chain, lastBlock); -}; - -export function printRow(rowkey: string, rowData: { [x: string]: any }) { - console.log(`Reading data for ${rowkey}:`); - - for (const columnFamily of Object.keys(rowData)) { - const columnFamilyData = rowData[columnFamily]; - console.log(`Column Family ${columnFamily}`); - - for (const columnQualifier of Object.keys(columnFamilyData)) { - const col = columnFamilyData[columnQualifier]; - - for (const cell of col) { - const labels = cell.labels.length ? ` [${cell.labels.join(',')}]` : ''; - console.log(`\t${columnQualifier}: ${cell.value} @${cell.timestamp}${labels}`); - } - } - } - console.log(); -} diff --git a/event-watcher/src/index.ts b/event-watcher/src/index.ts index f1b014a01..3296fcc94 100644 --- a/event-watcher/src/index.ts +++ b/event-watcher/src/index.ts @@ -1,74 +1,53 @@ -import * as dotenv from 'dotenv'; +import dotenv from 'dotenv'; dotenv.config(); -import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; -import { initDb } from './databases/utils'; +import { getDB } from './databases/utils'; +import { getSNS } from './services/SNS/utils'; import { makeFinalizedWatcher } from './watchers/utils'; import { InfrastructureController } from './infrastructure/infrastructure.controller'; import { createServer } from './builder/server'; +import { env, evmChains } from './config'; +import { DBOptionTypes } from './databases/types'; +import { SNSOptionTypes } from './services/SNS/types'; +class EventWatcher { + private infrastructureController = new InfrastructureController(); + + constructor(private db: DBOptionTypes, private sns: SNSOptionTypes) { + this.setup(); + } -// EVM Chains not supported -// aurora, gnosis, neon, sepolia - -const evmChains: EVMChainName[] = [ - 'acala', - 'arbitrum', - 'avalanche', - 'base', - 'bsc', - 'celo', - 'ethereum', - 'fantom', - 'karura', - 'klaytn', - 'moonbeam', - 'oasis', - 'optimism', - 'polygon', -]; + async setup() { + await this.startServer(); + } -const supportedChains: ChainName[] = [ - ...evmChains, - 'algorand', - 'aptos', - 'injective', - 'near', - 'solana', - 'sui', - 'terra', - 'terra2', - 'xpla', -]; + async startServer() { + const port = Number(env.PORT) || 3005; + const server = await createServer(port); -const db = initDb(); -const infrastructureController = new InfrastructureController(); + server.get('/ready', { logLevel: 'silent' }, this.infrastructureController.ready); + server.get('/health', { logLevel: 'silent' }, this.infrastructureController.health); -const startServer = async () => { - const port = Number(process.env.PORT) || 3005; - const server = await createServer(port); + server.listen({ port, host: '0.0.0.0' }, (err: any, address: any) => { + if (err) process.exit(1); + console.log(`Server listening at ${address}`); + }); + } - server.get('/ready', { logLevel: 'silent' }, infrastructureController.ready); - server.get('/health', { logLevel: 'silent' }, infrastructureController.health); + async run() { + await this.db.start(); - server.listen({ port, host: '0.0.0.0' }, (err: any, address: any) => { - if (err) { - process.exit(1); + // for (const chain of supportedChains) { + for (const chain of evmChains) { + const watcher = makeFinalizedWatcher(chain); + watcher.setDB(this.db); + watcher.setServices(this.sns); + watcher.watch(); } - console.log(`Server listening at ${address}`); - }); -}; - -startServer(); - -const start = async () => { - // We wait to the database to fetch the `latestBlocks` (avoid multi requests) - // Im trying not to change too much the codebase. - await db.getLastBlockByChain('unset'); - - // for (const chain of supportedChains) { - for (const chain of evmChains) { - makeFinalizedWatcher(chain).watch(); } -}; +} -start(); +// Init and run the event watcher +const db: DBOptionTypes = getDB(); +const sns: SNSOptionTypes = getSNS(); +const eventWatcher = new EventWatcher(db, sns); +eventWatcher.run(); diff --git a/event-watcher/src/services/SNS/AwsSNS/index.ts b/event-watcher/src/services/SNS/AwsSNS/index.ts index 178c29607..747c5008a 100644 --- a/event-watcher/src/services/SNS/AwsSNS/index.ts +++ b/event-watcher/src/services/SNS/AwsSNS/index.ts @@ -7,14 +7,17 @@ import { PublishBatchCommandInput, PublishBatchRequestEntry, } from '@aws-sdk/client-sns'; -import { SNSConfig, SNSImplementation, SNSInput, SNSPublishMessageOutput } from '../types'; +import { AwsSNSConfig, SNSInput, SNSPublishMessageOutput } from '../types'; +import BaseSNS from '../BaseSNS'; -class AwsSNS implements SNSImplementation { - private client: SNSClient | null = null; - private subject: string | null = null; - private topicArn: string | null = null; +class AwsSNS extends BaseSNS { + private client: SNSClient; + private subject: string; + private topicArn: string; + + constructor(private config: AwsSNSConfig) { + super(); - constructor(private config: SNSConfig) { const { region, credentials, subject, topicArn } = this.config; this.subject = subject; @@ -25,7 +28,7 @@ class AwsSNS implements SNSImplementation { }); } - async publishMessage({ subject, message }: SNSInput): Promise { + override async publishMessage({ subject, message }: SNSInput): Promise { const input: PublishCommandInput = { TopicArn: this.topicArn!, Subject: subject ?? this.subject!, @@ -48,7 +51,7 @@ class AwsSNS implements SNSImplementation { }; } - async publishMessages(messages: SNSInput[]): Promise { + override async publishMessages(messages: SNSInput[]): Promise { const CHUNK_SIZE = 10; const batches: PublishBatchCommandInput[] = []; const inputs: PublishBatchRequestEntry[] = messages.map(({ subject, message }) => ({ diff --git a/event-watcher/src/services/SNS/BaseSNS.ts b/event-watcher/src/services/SNS/BaseSNS.ts new file mode 100644 index 000000000..62e9a5ffd --- /dev/null +++ b/event-watcher/src/services/SNS/BaseSNS.ts @@ -0,0 +1,8 @@ +import { SNSImplementation, SNSInput, SNSPublishMessageOutput } from './types'; + +abstract class BaseSNS implements SNSImplementation { + abstract publishMessage(message: SNSInput): Promise; + abstract publishMessages(messages: SNSInput[]): Promise; +} + +export default BaseSNS; diff --git a/event-watcher/src/services/SNS/types.ts b/event-watcher/src/services/SNS/types.ts index 7a0900275..9a7cdf130 100644 --- a/event-watcher/src/services/SNS/types.ts +++ b/event-watcher/src/services/SNS/types.ts @@ -1,9 +1,12 @@ +import AwsSNS from './AwsSNS'; + +export type SNSOptionTypes = AwsSNS | null; export interface SNSImplementation { publishMessage(message: SNSInput): Promise; publishMessages(messages: SNSInput[]): Promise; } -export interface SNSConfig { +export interface AwsSNSConfig { region: string; topicArn: string; subject: string; diff --git a/event-watcher/src/services/SNS/utils.ts b/event-watcher/src/services/SNS/utils.ts new file mode 100644 index 000000000..a12f618f3 --- /dev/null +++ b/event-watcher/src/services/SNS/utils.ts @@ -0,0 +1,18 @@ +import { env } from '../../config'; +import AwsSNS from './AwsSNS'; +import { AwsSNSConfig, SNSOptionTypes } from './types'; + +const AwsConfig: AwsSNSConfig = { + region: env.AWS_SNS_REGION as string, + subject: env.AWS_SNS_SUBJECT as string, + topicArn: env.AWS_SNS_TOPIC_ARN as string, + credentials: { + accessKeyId: env.AWS_ACCESS_KEY_ID as string, + secretAccessKey: env.AWS_SECRET_ACCESS_KEY as string, + }, +}; + +export const getSNS = (): SNSOptionTypes => { + if (env.SNS_SOURCE === 'aws') return new AwsSNS(AwsConfig); + return null; +}; diff --git a/event-watcher/src/utils/environment.ts b/event-watcher/src/utils/environment.ts index 4ffd3a25c..5876b1fa9 100644 --- a/event-watcher/src/utils/environment.ts +++ b/event-watcher/src/utils/environment.ts @@ -1,3 +1,5 @@ +import { env } from '../config'; + let loggingEnv: LoggingEnvironment | undefined = undefined; export type LoggingEnvironment = { @@ -10,8 +12,8 @@ export const getEnvironment = () => { return loggingEnv; } else { loggingEnv = { - logLevel: process.env.LOG_LEVEL || 'info', - logDir: process.env.LOG_DIR, + logLevel: env.LOG_LEVEL, + logDir: env.LOG_DIR, }; return loggingEnv; } diff --git a/event-watcher/src/watchers/AlgorandWatcher.ts b/event-watcher/src/watchers/AlgorandWatcher.ts index dcf26ce3c..e792028f0 100644 --- a/event-watcher/src/watchers/AlgorandWatcher.ts +++ b/event-watcher/src/watchers/AlgorandWatcher.ts @@ -1,17 +1,17 @@ import algosdk from 'algosdk'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; import { ALGORAND_INFO } from '../consts'; -import { VaasByBlock } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; +import { VaaLog } from '../databases/types'; type Message = { blockKey: string; vaaKey: string; }; -export class AlgorandWatcher extends Watcher { +export class AlgorandWatcher extends BaseWatcher { // Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination - maximumBatchSize: number = 100000; + override maximumBatchSize: number = 100_000; algodClient: algosdk.Algodv2; indexerClient: algosdk.Indexer; @@ -26,16 +26,16 @@ export class AlgorandWatcher extends Watcher { this.algodClient = new algosdk.Algodv2( ALGORAND_INFO.algodToken, ALGORAND_INFO.algodServer, - ALGORAND_INFO.algodPort + ALGORAND_INFO.algodPort, ); this.indexerClient = new algosdk.Indexer( ALGORAND_INFO.token, ALGORAND_INFO.server, - ALGORAND_INFO.port + ALGORAND_INFO.port, ); } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { this.logger.info(`fetching final block for ${this.chain}`); let status = await this.algodClient.status().do(); @@ -78,13 +78,13 @@ export class AlgorandWatcher extends Watcher { messages.push({ blockKey: makeBlockKey( transaction['confirmed-round'].toString(), - new Date(transaction['round-time'] * 1000).toISOString() + new Date(transaction['round-time'] * 1000).toISOString(), ), vaaKey: makeVaaKey( parentId || transaction.id, this.chain, Buffer.from(algosdk.decodeAddress(transaction.sender).publicKey).toString('hex'), - BigInt(`0x${Buffer.from(transaction.logs[0], 'base64').toString('hex')}`).toString() + BigInt(`0x${Buffer.from(transaction.logs[0], 'base64').toString('hex')}`).toString(), ), }); } @@ -96,32 +96,36 @@ export class AlgorandWatcher extends Watcher { return messages; } - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock); - const transactions = []; - for (const txId of txIds) { - const response = await this.indexerClient.searchForTransactions().txid(txId).do(); - if (response?.transactions?.[0]) { - transactions.push(response.transactions[0]); - } - } - let messages: Message[] = []; - for (const transaction of transactions) { - messages = [...messages, ...this.processTransaction(transaction)]; - } - const vaasByBlock = messages.reduce((vaasByBlock, message) => { - if (!vaasByBlock[message.blockKey]) { - vaasByBlock[message.blockKey] = []; - } - vaasByBlock[message.blockKey].push(message.vaaKey); - return vaasByBlock; - }, {} as VaasByBlock); - const toBlockInfo = await this.indexerClient.lookupBlock(toBlock).do(); - const toBlockTimestamp = new Date(toBlockInfo.timestamp * 1000).toISOString(); - const toBlockKey = makeBlockKey(toBlock.toString(), toBlockTimestamp); - if (!vaasByBlock[toBlockKey]) { - vaasByBlock[toBlockKey] = []; - } - return vaasByBlock; + // async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock); + // const transactions = []; + // for (const txId of txIds) { + // const response = await this.indexerClient.searchForTransactions().txid(txId).do(); + // if (response?.transactions?.[0]) { + // transactions.push(response.transactions[0]); + // } + // } + // let messages: Message[] = []; + // for (const transaction of transactions) { + // messages = [...messages, ...this.processTransaction(transaction)]; + // } + // const vaasByBlock = messages.reduce((vaasByBlock, message) => { + // if (!vaasByBlock[message.blockKey]) { + // vaasByBlock[message.blockKey] = []; + // } + // vaasByBlock[message.blockKey].push(message.vaaKey); + // return vaasByBlock; + // }, {} as VaasByBlock); + // const toBlockInfo = await this.indexerClient.lookupBlock(toBlock).do(); + // const toBlockTimestamp = new Date(toBlockInfo.timestamp * 1000).toISOString(); + // const toBlockKey = makeBlockKey(toBlock.toString(), toBlockTimestamp); + // if (!vaasByBlock[toBlockKey]) { + // vaasByBlock[toBlockKey] = []; + // } + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } } diff --git a/event-watcher/src/watchers/AptosWatcher.ts b/event-watcher/src/watchers/AptosWatcher.ts index a7ab24bf8..63cf2084d 100644 --- a/event-watcher/src/watchers/AptosWatcher.ts +++ b/event-watcher/src/watchers/AptosWatcher.ts @@ -3,10 +3,10 @@ import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../common'; import { AptosClient } from 'aptos'; import { z } from 'zod'; import { RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; import { makeVaaKey } from '../databases/utils'; import { AptosEvent } from '../types/aptos'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; +import { VaaLog } from '../databases/types'; const APTOS_CORE_BRIDGE_ADDRESS = CONTRACTS.MAINNET.aptos.core; const APTOS_EVENT_HANDLE = `${APTOS_CORE_BRIDGE_ADDRESS}::state::WormholeMessageHandle`; @@ -16,54 +16,58 @@ const APTOS_FIELD_NAME = 'event'; * NOTE: The Aptos watcher differs from other watchers in that it uses the event sequence number to * fetch Wormhole messages and therefore also stores sequence numbers instead of block numbers. */ -export class AptosWatcher extends Watcher { +export class AptosWatcher extends BaseWatcher { client: AptosClient; - maximumBatchSize: number = 25; + override maximumBatchSize: number = 25; constructor() { super('aptos'); this.client = new AptosClient(RPCS_BY_CHAIN[this.chain]!); } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { return Number( ( await this.client.getEventsByEventHandle( APTOS_CORE_BRIDGE_ADDRESS, APTOS_EVENT_HANDLE, APTOS_FIELD_NAME, - { limit: 1 } + { limit: 1 }, ) - )[0].sequence_number + )[0].sequence_number, ); } - async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise { - const limit = toSequence - fromSequence + 1; - const events: AptosEvent[] = (await this.client.getEventsByEventHandle( - APTOS_CORE_BRIDGE_ADDRESS, - APTOS_EVENT_HANDLE, - APTOS_FIELD_NAME, - { start: fromSequence, limit } - )) as AptosEvent[]; - const vaasByBlock: VaasByBlock = {}; - await Promise.all( - events.map(async ({ data, sequence_number, version }) => { - const [block, transaction] = await Promise.all([ - this.client.getBlockByVersion(Number(version)), - this.client.getTransactionByVersion(Number(version)), - ]); - const timestamp = new Date(Number(block.block_timestamp) / 1000).toISOString(); - const blockKey = [block.block_height, timestamp, sequence_number].join('/'); // use custom block key for now so we can include sequence number - const emitter = data.sender.padStart(64, '0'); - const vaaKey = makeVaaKey(transaction.hash, this.chain, emitter, data.sequence); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey]; - }) - ); - return vaasByBlock; + // async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise { + // const limit = toSequence - fromSequence + 1; + // const events: AptosEvent[] = (await this.client.getEventsByEventHandle( + // APTOS_CORE_BRIDGE_ADDRESS, + // APTOS_EVENT_HANDLE, + // APTOS_FIELD_NAME, + // { start: fromSequence, limit } + // )) as AptosEvent[]; + // const vaasByBlock: VaasByBlock = {}; + // await Promise.all( + // events.map(async ({ data, sequence_number, version }) => { + // const [block, transaction] = await Promise.all([ + // this.client.getBlockByVersion(Number(version)), + // this.client.getTransactionByVersion(Number(version)), + // ]); + // const timestamp = new Date(Number(block.block_timestamp) / 1000).toISOString(); + // const blockKey = [block.block_height, timestamp, sequence_number].join('/'); // use custom block key for now so we can include sequence number + // const emitter = data.sender.padStart(64, '0'); + // const vaaKey = makeVaaKey(transaction.hash, this.chain, emitter, data.sequence); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey]; + // }) + // ); + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } - isValidBlockKey(key: string) { + override isValidBlockKey(key: string) { try { const [block, timestamp, sequence] = key.split('/'); const initialSequence = z @@ -80,7 +84,7 @@ export class AptosWatcher extends Watcher { } } - isValidVaaKey(key: string) { + override isValidVaaKey(key: string) { try { const [txHash, vaaKey] = key.split(':'); const [_, emitter, sequence] = vaaKey.split('/'); diff --git a/event-watcher/src/watchers/ArbitrumWatcher.ts b/event-watcher/src/watchers/ArbitrumWatcher.ts index 3e08aec3e..7c87b724d 100644 --- a/event-watcher/src/watchers/ArbitrumWatcher.ts +++ b/event-watcher/src/watchers/ArbitrumWatcher.ts @@ -23,7 +23,7 @@ export class ArbitrumWatcher extends EVMWatcher { this.maximumBatchSize = 25; } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { if (!this.rpc) { throw new Error(`${this.chain} RPC is not defined!`); } @@ -40,18 +40,21 @@ export class ArbitrumWatcher extends EVMWatcher { params: ['latest', false], }, ], - AXIOS_CONFIG_JSON + AXIOS_CONFIG_JSON, ) )?.data?.[0]?.result; if (!l1Result || !l1Result.l1BlockNumber || !l1Result.number) { throw new Error( - `Unable to parse result of ArbitrumWatcher::eth_getBlockByNumber for latest on ${this.rpc}` + `Unable to parse result of ArbitrumWatcher::eth_getBlockByNumber for latest on ${this.rpc}`, ); } const associatedL1: number = parseInt(l1Result.l1BlockNumber, 16); const l2BlkNum: number = parseInt(l1Result.number, 16); this.logger.debug( - 'getFinalizedBlockNumber() checking map L1Block: ' + associatedL1 + ' => L2Block: ' + l2BlkNum + 'getFinalizedBlockNumber() checking map L1Block: ' + + associatedL1 + + ' => L2Block: ' + + l2BlkNum, ); // Only update the map, if the L2 block number is newer diff --git a/event-watcher/src/watchers/BSCWatcher.ts b/event-watcher/src/watchers/BSCWatcher.ts index fb46730d7..06cd3890a 100644 --- a/event-watcher/src/watchers/BSCWatcher.ts +++ b/event-watcher/src/watchers/BSCWatcher.ts @@ -4,7 +4,8 @@ export class BSCWatcher extends EVMWatcher { constructor() { super('bsc'); } - async getFinalizedBlockNumber(): Promise { + + override async getFinalizedBlockNumber(): Promise { const latestBlock = await super.getFinalizedBlockNumber(); return Math.max(latestBlock - 15, 0); } diff --git a/event-watcher/src/watchers/Watcher.ts b/event-watcher/src/watchers/BaseWatcher.ts similarity index 52% rename from event-watcher/src/watchers/Watcher.ts rename to event-watcher/src/watchers/BaseWatcher.ts index 582c826cd..7085a8e63 100644 --- a/event-watcher/src/watchers/Watcher.ts +++ b/event-watcher/src/watchers/BaseWatcher.ts @@ -2,52 +2,37 @@ import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common'; import { z } from 'zod'; import { TIMEOUT } from '../consts'; -import { VaaLog, VaasByBlock } from '../databases/types'; -import { - getResumeBlockByChain, - storeVaaLogs, - storeVaasByBlock, - storeLatestProcessBlock, -} from '../databases/utils'; +import { DBOptionTypes, VaaLog } from '../databases/types'; import { getLogger, WormholeLogger } from '../utils/logger'; -import AwsSNS from '../services/SNS/AwsSNS'; -import { SNSConfig, SNSInput } from '../services/SNS/types'; +import { SNSInput, SNSOptionTypes } from '../services/SNS/types'; +import { WatcherImplementation } from './types'; -const config: SNSConfig = { - region: process.env.AWS_SNS_REGION as string, - subject: process.env.AWS_SNS_SUBJECT as string, - topicArn: process.env.AWS_TOPIC_ARN as string, - credentials: { - accessKeyId: process.env.AWS_ACCESS_KEY_ID as string, - secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string, - }, -}; - -export class Watcher { - chain: ChainName; - logger: WormholeLogger; +abstract class BaseWatcher implements WatcherImplementation { + public logger: WormholeLogger; maximumBatchSize: number = 100; - SNSClient: AwsSNS; + sns?: SNSOptionTypes; + db?: DBOptionTypes; - constructor(chain: ChainName) { - this.chain = chain; + constructor(public chain: ChainName) { this.logger = getLogger(chain); - this.SNSClient = new AwsSNS(config); } - async getFinalizedBlockNumber(): Promise { - throw new Error('Not Implemented'); + setDB(db: DBOptionTypes) { + this.db = db; } - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - throw new Error('Not Implemented'); + setServices(sns: SNSOptionTypes) { + this.sns = sns; } - async getVaaLogs(fromBlock: number, toBlock: number): Promise { - throw new Error('Not Implemented'); + abstract getFinalizedBlockNumber(): Promise; + abstract getVaaLogs(fromBlock: number, toBlock: number): Promise; + + isValidVaaKey(key: string): boolean { + throw new Error('Method not implemented.'); } - isValidBlockKey(key: string) { + isValidBlockKey(key: string): boolean { try { const [block, timestamp] = key.split('/'); const initialBlock = z @@ -63,13 +48,11 @@ export class Watcher { } } - isValidVaaKey(key: string): boolean { - throw new Error('Not Implemented'); - } - async watch(): Promise { let toBlock: number | null = null; - let fromBlock: number | null = await getResumeBlockByChain(this.chain); + let fromBlock: number | null = this.db + ? await this.db?.getResumeBlockByChain(this.chain) + : null; let retry = 0; while (true) { @@ -77,23 +60,27 @@ export class Watcher { if (fromBlock !== null && toBlock !== null && fromBlock <= toBlock) { // fetch logs for the block range, inclusive of toBlock toBlock = Math.min(fromBlock + this.maximumBatchSize - 1, toBlock); - this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`); - - // const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock); - // await storeVaasByBlock(this.chain, vaasByBlock); - // Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC - // Then store the latest processed block by Chain Id try { + this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`); + // Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC const vaaLogs = await this.getVaaLogs(fromBlock, toBlock); + if (vaaLogs?.length > 0) { - await storeVaaLogs(this.chain, vaaLogs); + // Then store the vaa logs processed in db + // TODO: handle store logs failure + await this.db?.storeVaaLogs(this.chain, vaaLogs); + + // Then publish the vaa logs processed in SNS const messages: SNSInput[] = vaaLogs.map((log) => ({ message: JSON.stringify({ ...log }), })); - this.SNSClient.publishMessages(messages); + // TODO: handle publish failure + this.sns?.publishMessages(messages); } - await storeLatestProcessBlock(this.chain, toBlock); + // Then store the latest processed block by Chain Id + // TODO: handle store last blocks failure + await this.db?.storeLatestProcessBlock(this.chain, toBlock); } catch (e) { this.logger.error(e); } @@ -119,10 +106,12 @@ export class Watcher { } catch (e) { retry++; this.logger.error(e); - const expoBacko = TIMEOUT * 2 ** retry; - this.logger.warn(`backing off for ${expoBacko}ms`); - await sleep(expoBacko); + const backOffTimeoutMS = TIMEOUT * 2 ** retry; + this.logger.warn(`backing off for ${backOffTimeoutMS}ms`); + await sleep(backOffTimeoutMS); } } } } + +export default BaseWatcher; diff --git a/event-watcher/src/watchers/CosmwasmWatcher.ts b/event-watcher/src/watchers/CosmwasmWatcher.ts index 71e6b525f..b4f19cba1 100644 --- a/event-watcher/src/watchers/CosmwasmWatcher.ts +++ b/event-watcher/src/watchers/CosmwasmWatcher.ts @@ -1,13 +1,13 @@ import { CONTRACTS, CosmWasmChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import axios from 'axios'; import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; import { SHA256 } from 'jscrypto/SHA256'; import { Base64 } from 'jscrypto/Base64'; +import { VaaLog } from '../databases/types'; -export class CosmwasmWatcher extends Watcher { +export class CosmwasmWatcher extends BaseWatcher { latestBlockTag: string; getBlockTag: string; hashTag: string; @@ -38,7 +38,7 @@ export class CosmwasmWatcher extends Watcher { return SHA256.hash(Base64.parse(data)).toString().toUpperCase(); } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { const result = (await axios.get(`${this.rpc}/${this.latestBlockTag}`)).data; if (result && result.block.header.height) { let blockHeight: number = parseInt(result.block.header.height); @@ -51,105 +51,109 @@ export class CosmwasmWatcher extends Watcher { throw new Error(`Unable to parse result of ${this.latestBlockTag} on ${this.rpc}`); } - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - const address = CONTRACTS.MAINNET[this.chain].core; - if (!address) { - throw new Error(`Core contract not defined for ${this.chain}`); - } - this.logger.debug(`core contract for ${this.chain} is ${address}`); - let vaasByBlock: VaasByBlock = {}; - this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + // async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // const address = CONTRACTS.MAINNET[this.chain].core; + // if (!address) { + // throw new Error(`Core contract not defined for ${this.chain}`); + // } + // this.logger.debug(`core contract for ${this.chain} is ${address}`); + // let vaasByBlock: VaasByBlock = {}; + // this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); - // For each block number, call {RPC}/{getBlockTag}/{block_number} - // Foreach block.data.txs[] do hexToHash() to get the txHash - // Then call {RPC}/{hashTag}/{hash} to get the logs/events - // Walk the logs/events + // // For each block number, call {RPC}/{getBlockTag}/{block_number} + // // Foreach block.data.txs[] do hexToHash() to get the txHash + // // Then call {RPC}/{hashTag}/{hash} to get the logs/events + // // Walk the logs/events - for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) { - this.logger.debug('Getting block number ' + blockNumber); - const blockResult: CosmwasmBlockResult = ( - await axios.get(`${this.rpc}/${this.getBlockTag}${blockNumber}`) - ).data; - if (!blockResult || !blockResult.block.data) { - throw new Error('bad result for block ${blockNumber}'); - } - const blockKey = makeBlockKey( - blockNumber.toString(), - new Date(blockResult.block.header.time).toISOString() - ); - vaasByBlock[blockKey] = []; - let vaaKey: string = ''; - let numTxs: number = 0; - if (blockResult.block.data.txs) { - numTxs = blockResult.block.data.txs.length; - } - for (let i = 0; i < numTxs; i++) { - // The following check is not needed because of the check for numTxs. - // But typescript wanted it anyway. - if (!blockResult.block.data.txs) { - continue; - } - let hash: string = this.hexToHash(blockResult.block.data.txs[i]); - this.logger.debug('blockNumber = ' + blockNumber + ', txHash[' + i + '] = ' + hash); - // console.log('Attempting to get hash', `${this.rpc}/${this.hashTag}${hash}`); - try { - const hashResult: CosmwasmHashResult = ( - await axios.get(`${this.rpc}/${this.hashTag}${hash}`, AXIOS_CONFIG_JSON) - ).data; - if (hashResult && hashResult.tx_response.events) { - const numEvents = hashResult.tx_response.events.length; - for (let j = 0; j < numEvents; j++) { - let type: string = hashResult.tx_response.events[j].type; - if (type === 'wasm') { - if (hashResult.tx_response.events[j].attributes) { - let attrs = hashResult.tx_response.events[j].attributes; - let emitter: string = ''; - let sequence: string = ''; - let coreContract: boolean = false; - // only care about _contract_address, message.sender and message.sequence - const numAttrs = attrs.length; - for (let k = 0; k < numAttrs; k++) { - const key = Buffer.from(attrs[k].key, 'base64').toString().toLowerCase(); - this.logger.debug('Encoded Key = ' + attrs[k].key + ', decoded = ' + key); - if (key === 'message.sender') { - emitter = Buffer.from(attrs[k].value, 'base64').toString(); - } else if (key === 'message.sequence') { - sequence = Buffer.from(attrs[k].value, 'base64').toString(); - } else if (key === '_contract_address' || key === 'contract_address') { - let addr = Buffer.from(attrs[k].value, 'base64').toString(); - if (addr === address) { - coreContract = true; - } - } - } - if (coreContract && emitter !== '' && sequence !== '') { - vaaKey = makeVaaKey(hash, this.chain, emitter, sequence); - this.logger.debug('blockKey: ' + blockKey); - this.logger.debug('Making vaaKey: ' + vaaKey); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - } - } - } - } else { - this.logger.error('There were no hashResults'); - } - } catch (e: any) { - // console.error(e); - if ( - e?.response?.status === 500 && - e?.response?.data?.code === 2 && - e?.response?.data?.message.startsWith('json: error calling MarshalJSON') - ) { - // Just skip this one... - } else { - // Rethrow the error because we only want to catch the above error - throw e; - } - } - } - } - return vaasByBlock; + // for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) { + // this.logger.debug('Getting block number ' + blockNumber); + // const blockResult: CosmwasmBlockResult = ( + // await axios.get(`${this.rpc}/${this.getBlockTag}${blockNumber}`) + // ).data; + // if (!blockResult || !blockResult.block.data) { + // throw new Error('bad result for block ${blockNumber}'); + // } + // const blockKey = makeBlockKey( + // blockNumber.toString(), + // new Date(blockResult.block.header.time).toISOString() + // ); + // vaasByBlock[blockKey] = []; + // let vaaKey: string = ''; + // let numTxs: number = 0; + // if (blockResult.block.data.txs) { + // numTxs = blockResult.block.data.txs.length; + // } + // for (let i = 0; i < numTxs; i++) { + // // The following check is not needed because of the check for numTxs. + // // But typescript wanted it anyway. + // if (!blockResult.block.data.txs) { + // continue; + // } + // let hash: string = this.hexToHash(blockResult.block.data.txs[i]); + // this.logger.debug('blockNumber = ' + blockNumber + ', txHash[' + i + '] = ' + hash); + // // console.log('Attempting to get hash', `${this.rpc}/${this.hashTag}${hash}`); + // try { + // const hashResult: CosmwasmHashResult = ( + // await axios.get(`${this.rpc}/${this.hashTag}${hash}`, AXIOS_CONFIG_JSON) + // ).data; + // if (hashResult && hashResult.tx_response.events) { + // const numEvents = hashResult.tx_response.events.length; + // for (let j = 0; j < numEvents; j++) { + // let type: string = hashResult.tx_response.events[j].type; + // if (type === 'wasm') { + // if (hashResult.tx_response.events[j].attributes) { + // let attrs = hashResult.tx_response.events[j].attributes; + // let emitter: string = ''; + // let sequence: string = ''; + // let coreContract: boolean = false; + // // only care about _contract_address, message.sender and message.sequence + // const numAttrs = attrs.length; + // for (let k = 0; k < numAttrs; k++) { + // const key = Buffer.from(attrs[k].key, 'base64').toString().toLowerCase(); + // this.logger.debug('Encoded Key = ' + attrs[k].key + ', decoded = ' + key); + // if (key === 'message.sender') { + // emitter = Buffer.from(attrs[k].value, 'base64').toString(); + // } else if (key === 'message.sequence') { + // sequence = Buffer.from(attrs[k].value, 'base64').toString(); + // } else if (key === '_contract_address' || key === 'contract_address') { + // let addr = Buffer.from(attrs[k].value, 'base64').toString(); + // if (addr === address) { + // coreContract = true; + // } + // } + // } + // if (coreContract && emitter !== '' && sequence !== '') { + // vaaKey = makeVaaKey(hash, this.chain, emitter, sequence); + // this.logger.debug('blockKey: ' + blockKey); + // this.logger.debug('Making vaaKey: ' + vaaKey); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; + // } + // } + // } + // } + // } else { + // this.logger.error('There were no hashResults'); + // } + // } catch (e: any) { + // // console.error(e); + // if ( + // e?.response?.status === 500 && + // e?.response?.data?.code === 2 && + // e?.response?.data?.message.startsWith('json: error calling MarshalJSON') + // ) { + // // Just skip this one... + // } else { + // // Rethrow the error because we only want to catch the above error + // throw e; + // } + // } + // } + // } + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } } @@ -230,7 +234,7 @@ type CosmwasmHashResult = { validator_src_address: string; validator_dst_address: string; amount: { denom: string; amount: string }; - } + }, ]; memo: ''; timeout_height: '0'; @@ -246,7 +250,7 @@ type CosmwasmHashResult = { }; mode_info: { single: { mode: string } }; sequence: string; - } + }, ]; fee: { amount: [{ denom: string; amount: string }]; @@ -269,6 +273,6 @@ type EventsType = { key: string; value: string; index: boolean; - } + }, ]; }; diff --git a/event-watcher/src/watchers/EVMWatcher.ts b/event-watcher/src/watchers/EVMWatcher.ts index df8acd8a6..58322df81 100644 --- a/event-watcher/src/watchers/EVMWatcher.ts +++ b/event-watcher/src/watchers/EVMWatcher.ts @@ -8,9 +8,8 @@ import { Log } from '@ethersproject/abstract-provider'; import axios from 'axios'; import { BigNumber } from 'ethers'; import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; -import { VaaLog, VaasByBlock } from '../databases/types'; -import { makeBlockKey, makeVaaKey } from '../databases/utils'; -import { Watcher } from './Watcher'; +import { VaaLog } from '../databases/types'; +import BaseWatcher from './BaseWatcher'; // This is the hash for topic[0] of the core contract event LogMessagePublished // https://github.com/wormhole-foundation/wormhole/blob/main/ethereum/contracts/Implementation.sol#L12 @@ -29,7 +28,7 @@ export type ErrorBlock = { message: string; //'Error: No response received from RPC endpoint in 60s' }; -export class EVMWatcher extends Watcher { +export class EVMWatcher extends BaseWatcher { finalizedBlockTag: BlockTag; lastTimestamp: number; latestFinalizedBlockNumber: number; @@ -39,7 +38,7 @@ export class EVMWatcher extends Watcher { this.lastTimestamp = 0; this.latestFinalizedBlockNumber = 0; this.finalizedBlockTag = finalizedBlockTag; - if (chain === 'acala' || chain === 'karura') { + if (['acala', 'karura'].includes(chain)) { this.maximumBatchSize = 50; } } @@ -201,45 +200,14 @@ export class EVMWatcher extends Watcher { throw new Error(`Unable to parse result of eth_getLogs for ${fromBlock}-${toBlock} on ${rpc}`); } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { this.logger.info(`fetching block ${this.finalizedBlockTag}`); const block: Block = await this.getBlock(this.finalizedBlockTag); this.latestFinalizedBlockNumber = block.number; return block.number; } - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - const address = CONTRACTS.MAINNET[this.chain].core; - if (!address) { - throw new Error(`Core contract not defined for ${this.chain}`); - } - const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]); - const timestampsByBlock: { [block: number]: string } = {}; - // fetch timestamps for each block - const vaasByBlock: VaasByBlock = {}; - this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); - const blocks = await this.getBlocks(fromBlock, toBlock); - for (const block of blocks) { - const timestamp = new Date(block.timestamp * 1000).toISOString(); - timestampsByBlock[block.number] = timestamp; - vaasByBlock[makeBlockKey(block.number.toString(), timestamp)] = []; - } - this.logger.info(`processing ${logs.length} logs`); - - for (const log of logs) { - const blockNumber = log.blockNumber; - const emitter = log.topics[1].slice(2); - const { - args: { sequence, sender, payload }, - } = wormholeInterface.parseLog(log); - const vaaKey = makeVaaKey(log.transactionHash, this.chain, emitter, sequence.toString()); - const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - return vaasByBlock; - } - - async getVaaLogs(fromBlock: number, toBlock: number): Promise { + override async getVaaLogs(fromBlock: number, toBlock: number): Promise { const vaaLogs: VaaLog[] = []; const address = CONTRACTS.MAINNET[this.chain].core; @@ -247,7 +215,6 @@ export class EVMWatcher extends Watcher { throw new Error(`Core contract not defined for ${this.chain}`); } - // this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]); this.logger.info(`processing ${logs.length} logs`); @@ -264,7 +231,7 @@ export class EVMWatcher extends Watcher { const vaaId = `${chainId}/${emitter}/${sequence.toString()}`; const vaaLog: VaaLog = { - id: vaaId, + vaaId, chainName, chainId, emitter, @@ -273,6 +240,9 @@ export class EVMWatcher extends Watcher { sender, payload, blockNumber, + indexedAt: new Date().getTime(), + updatedAt: new Date().getTime(), + createdAt: new Date().getTime(), }; vaaLogs.push(vaaLog); diff --git a/event-watcher/src/watchers/InjectiveExplorerWatcher.ts b/event-watcher/src/watchers/InjectiveExplorerWatcher.ts index f65f5867e..d5eb0edaa 100644 --- a/event-watcher/src/watchers/InjectiveExplorerWatcher.ts +++ b/event-watcher/src/watchers/InjectiveExplorerWatcher.ts @@ -1,14 +1,14 @@ import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import axios from 'axios'; import { RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; +import { VaaLog } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; import { EventObjectsTypes, RawLogEvents } from './TerraExplorerWatcher'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; -export class InjectiveExplorerWatcher extends Watcher { +export class InjectiveExplorerWatcher extends BaseWatcher { // Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination - maximumBatchSize: number = 1_000_000; + override maximumBatchSize: number = 1_000_000; latestBlockTag: string; getBlockTag: string; @@ -30,7 +30,7 @@ export class InjectiveExplorerWatcher extends Watcher { this.contractTag = 'api/explorer/v1/contractTxs/'; } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { const result: ExplorerBlocks = (await axios.get(`${this.rpc}/${this.latestBlockTag}`)).data; if (result && result.paging.total) { let blockHeight: number = result.paging.total; @@ -47,129 +47,133 @@ export class InjectiveExplorerWatcher extends Watcher { // should be core, but the explorer doesn't support it yet // use "to": as the pagination key // compare block height ("block_number":) with what is passed in. - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - const coreAddress = CONTRACTS.MAINNET[this.chain].core; - const address = CONTRACTS.MAINNET[this.chain].token_bridge; - if (!address) { - throw new Error(`Token Bridge contract not defined for ${this.chain}`); - } - this.logger.debug(`Token Bridge contract for ${this.chain} is ${address}`); - let vaasByBlock: VaasByBlock = {}; - this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + // async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // const coreAddress = CONTRACTS.MAINNET[this.chain].core; + // const address = CONTRACTS.MAINNET[this.chain].token_bridge; + // if (!address) { + // throw new Error(`Token Bridge contract not defined for ${this.chain}`); + // } + // this.logger.debug(`Token Bridge contract for ${this.chain} is ${address}`); + // let vaasByBlock: VaasByBlock = {}; + // this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); - const limit: number = 50; - let done: boolean = false; - let skip: number = 0; - let lastBlockInserted: number = 0; - while (!done) { - // This URL gets the paginated list of transactions for the token bridge contract - let url: string = `${this.rpc}/${this.contractTag}${address}?skip=${skip}&limit=${limit}`; - this.logger.debug(`Query string = ${url}`); - const bulkTxnResult = ( - await axios.get(url, { - headers: { - 'User-Agent': 'Mozilla/5.0', - }, - }) - ).data; - if (!bulkTxnResult) { - throw new Error('bad bulkTxnResult'); - } - skip = bulkTxnResult.paging.to; - const bulkTxns: ContractTxnData[] = bulkTxnResult.data; - if (!bulkTxns) { - throw new Error('No transactions'); - } - for (let i: number = 0; i < bulkTxns.length; ++i) { - // Walk the transactions - const txn: ContractTxnData = bulkTxns[i]; - const height: number = txn.block_number; - if (height >= fromBlock && height <= toBlock) { - // We only care about the transactions in the given block range - this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`); - const blockKey = makeBlockKey( - txn.block_number.toString(), - new Date(txn.block_unix_timestamp).toISOString() - ); - vaasByBlock[blockKey] = []; - lastBlockInserted = height; - this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`); - let vaaKey: string = ''; - // Each txn has an array of raw_logs - if (txn.logs) { - const rawLogs: RawLogEvents[] = txn.logs; - for (let j: number = 0; j < rawLogs.length; ++j) { - const rawLog: RawLogEvents = rawLogs[j]; - const events: EventObjectsTypes[] = rawLog.events; - if (!events) { - this.logger.debug( - `No events in rawLog${j} for block ${height}, hash = ${txn.hash}` - ); - continue; - } - for (let k: number = 0; k < events.length; k++) { - const event: EventObjectsTypes = events[k]; - if (event.type === 'wasm') { - if (event.attributes) { - let attrs = event.attributes; - let emitter: string = ''; - let sequence: string = ''; - let coreContract: boolean = false; - // only care about _contract_address, message.sender and message.sequence - const numAttrs = attrs.length; - for (let l = 0; l < numAttrs; l++) { - const key = attrs[l].key; - if (key === 'message.sender') { - emitter = attrs[l].value; - } else if (key === 'message.sequence') { - sequence = attrs[l].value; - } else if (key === '_contract_address' || key === 'contract_address') { - let addr = attrs[l].value; - if (addr === coreAddress) { - coreContract = true; - } - } - } - if (coreContract && emitter !== '' && sequence !== '') { - vaaKey = makeVaaKey(txn.hash, this.chain, emitter, sequence); - this.logger.debug('blockKey: ' + blockKey); - this.logger.debug('Making vaaKey: ' + vaaKey); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - } - } - } - } - } - } - if (height < fromBlock) { - this.logger.debug('Breaking out due to height < fromBlock'); - done = true; - break; - } - } - if (bulkTxns.length < limit) { - this.logger.debug('Breaking out due to ran out of txns.'); - done = true; - } - } - if (lastBlockInserted < toBlock) { - // Need to create something for the last requested block because it will - // become the new starting point for subsequent calls. - this.logger.debug(`Adding filler for block ${toBlock}`); - const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`; - this.logger.debug(`Query string for block = ${blkUrl}`); - const result = (await axios.get(blkUrl)).data; - if (!result) { - throw new Error(`Unable to get block information for block ${toBlock}`); - } - const blockKey = makeBlockKey( - result.data.height.toString(), - new Date(result.data.timestamp).toISOString() - ); - vaasByBlock[blockKey] = []; - } - return vaasByBlock; + // const limit: number = 50; + // let done: boolean = false; + // let skip: number = 0; + // let lastBlockInserted: number = 0; + // while (!done) { + // // This URL gets the paginated list of transactions for the token bridge contract + // let url: string = `${this.rpc}/${this.contractTag}${address}?skip=${skip}&limit=${limit}`; + // this.logger.debug(`Query string = ${url}`); + // const bulkTxnResult = ( + // await axios.get(url, { + // headers: { + // 'User-Agent': 'Mozilla/5.0', + // }, + // }) + // ).data; + // if (!bulkTxnResult) { + // throw new Error('bad bulkTxnResult'); + // } + // skip = bulkTxnResult.paging.to; + // const bulkTxns: ContractTxnData[] = bulkTxnResult.data; + // if (!bulkTxns) { + // throw new Error('No transactions'); + // } + // for (let i: number = 0; i < bulkTxns.length; ++i) { + // // Walk the transactions + // const txn: ContractTxnData = bulkTxns[i]; + // const height: number = txn.block_number; + // if (height >= fromBlock && height <= toBlock) { + // // We only care about the transactions in the given block range + // this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`); + // const blockKey = makeBlockKey( + // txn.block_number.toString(), + // new Date(txn.block_unix_timestamp).toISOString() + // ); + // vaasByBlock[blockKey] = []; + // lastBlockInserted = height; + // this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`); + // let vaaKey: string = ''; + // // Each txn has an array of raw_logs + // if (txn.logs) { + // const rawLogs: RawLogEvents[] = txn.logs; + // for (let j: number = 0; j < rawLogs.length; ++j) { + // const rawLog: RawLogEvents = rawLogs[j]; + // const events: EventObjectsTypes[] = rawLog.events; + // if (!events) { + // this.logger.debug( + // `No events in rawLog${j} for block ${height}, hash = ${txn.hash}` + // ); + // continue; + // } + // for (let k: number = 0; k < events.length; k++) { + // const event: EventObjectsTypes = events[k]; + // if (event.type === 'wasm') { + // if (event.attributes) { + // let attrs = event.attributes; + // let emitter: string = ''; + // let sequence: string = ''; + // let coreContract: boolean = false; + // // only care about _contract_address, message.sender and message.sequence + // const numAttrs = attrs.length; + // for (let l = 0; l < numAttrs; l++) { + // const key = attrs[l].key; + // if (key === 'message.sender') { + // emitter = attrs[l].value; + // } else if (key === 'message.sequence') { + // sequence = attrs[l].value; + // } else if (key === '_contract_address' || key === 'contract_address') { + // let addr = attrs[l].value; + // if (addr === coreAddress) { + // coreContract = true; + // } + // } + // } + // if (coreContract && emitter !== '' && sequence !== '') { + // vaaKey = makeVaaKey(txn.hash, this.chain, emitter, sequence); + // this.logger.debug('blockKey: ' + blockKey); + // this.logger.debug('Making vaaKey: ' + vaaKey); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; + // } + // } + // } + // } + // } + // } + // } + // if (height < fromBlock) { + // this.logger.debug('Breaking out due to height < fromBlock'); + // done = true; + // break; + // } + // } + // if (bulkTxns.length < limit) { + // this.logger.debug('Breaking out due to ran out of txns.'); + // done = true; + // } + // } + // if (lastBlockInserted < toBlock) { + // // Need to create something for the last requested block because it will + // // become the new starting point for subsequent calls. + // this.logger.debug(`Adding filler for block ${toBlock}`); + // const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`; + // this.logger.debug(`Query string for block = ${blkUrl}`); + // const result = (await axios.get(blkUrl)).data; + // if (!result) { + // throw new Error(`Unable to get block information for block ${toBlock}`); + // } + // const blockKey = makeBlockKey( + // result.data.height.toString(), + // new Date(result.data.timestamp).toISOString() + // ); + // vaasByBlock[blockKey] = []; + // } + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } } diff --git a/event-watcher/src/watchers/MoonbeamWatcher.ts b/event-watcher/src/watchers/MoonbeamWatcher.ts index 8d34a0176..2581ce9a6 100644 --- a/event-watcher/src/watchers/MoonbeamWatcher.ts +++ b/event-watcher/src/watchers/MoonbeamWatcher.ts @@ -7,7 +7,8 @@ export class MoonbeamWatcher extends EVMWatcher { constructor() { super('moonbeam'); } - async getFinalizedBlockNumber(): Promise { + + override async getFinalizedBlockNumber(): Promise { const latestBlock = await super.getFinalizedBlockNumber(); let isBlockFinalized = false; while (!isBlockFinalized) { @@ -30,7 +31,7 @@ export class MoonbeamWatcher extends EVMWatcher { params: [blockFromNumber.hash], }, ], - AXIOS_CONFIG_JSON + AXIOS_CONFIG_JSON, ) )?.data?.[0]?.result || false; } catch (e) { diff --git a/event-watcher/src/watchers/NearWatcher.ts b/event-watcher/src/watchers/NearWatcher.ts index 3a462e9c4..252ff68f7 100644 --- a/event-watcher/src/watchers/NearWatcher.ts +++ b/event-watcher/src/watchers/NearWatcher.ts @@ -5,60 +5,64 @@ import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider import ora from 'ora'; import { z } from 'zod'; import { RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; +import { VaaLog } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; import { EventLog } from '../types/near'; import { getNearProvider, isWormholePublishEventLog } from '../utils/near'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; -export class NearWatcher extends Watcher { +export class NearWatcher extends BaseWatcher { provider: Provider | null = null; constructor() { super('near'); } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { this.logger.info(`fetching final block for ${this.chain}`); const provider = await this.getProvider(); const block = await provider.block({ finality: 'final' }); return block.header.height; } - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - // assume toBlock was retrieved from getFinalizedBlockNumber and is finalized - this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); - const provider = await this.getProvider(); - const blocks: BlockResult[] = []; - let block: BlockResult | null = null; - try { - block = await provider.block({ blockId: toBlock }); - blocks.push(block); - while (true) { - // traverse backwards via block hashes: https://github.com/wormhole-foundation/wormhole-monitor/issues/35 - block = await provider.block({ blockId: block.header.prev_hash }); - if (block.header.height < fromBlock) break; - blocks.push(block); - } - } catch (e) { - if (e instanceof TypedError && e.type === 'HANDLER_ERROR') { - const error = block - ? `block ${block.header.prev_hash} is too old, use backfillNear for blocks before height ${block.header.height}` - : `toBlock ${toBlock} is too old, use backfillNear for this range`; // starting block too old - this.logger.error(error); - } else { - throw e; - } - } + // async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // // assume toBlock was retrieved from getFinalizedBlockNumber and is finalized + // this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + // const provider = await this.getProvider(); + // const blocks: BlockResult[] = []; + // let block: BlockResult | null = null; + // try { + // block = await provider.block({ blockId: toBlock }); + // blocks.push(block); + // while (true) { + // // traverse backwards via block hashes: https://github.com/wormhole-foundation/wormhole-monitor/issues/35 + // block = await provider.block({ blockId: block.header.prev_hash }); + // if (block.header.height < fromBlock) break; + // blocks.push(block); + // } + // } catch (e) { + // if (e instanceof TypedError && e.type === 'HANDLER_ERROR') { + // const error = block + // ? `block ${block.header.prev_hash} is too old, use backfillNear for blocks before height ${block.header.height}` + // : `toBlock ${toBlock} is too old, use backfillNear for this range`; // starting block too old + // this.logger.error(error); + // } else { + // throw e; + // } + // } + + // return getMessagesFromBlockResults(provider, blocks); + // } - return getMessagesFromBlockResults(provider, blocks); + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } async getProvider(): Promise { return (this.provider = this.provider || (await getNearProvider(RPCS_BY_CHAIN.near!))); } - isValidVaaKey(key: string) { + override isValidVaaKey(key: string) { try { const [txHash, vaaKey] = key.split(':'); const txHashDecoded = Buffer.from(decode(txHash)).toString('hex'); @@ -74,49 +78,49 @@ export class NearWatcher extends Watcher { } } -export const getMessagesFromBlockResults = async ( - provider: Provider, - blocks: BlockResult[], - debug: boolean = false -): Promise => { - const vaasByBlock: VaasByBlock = {}; - let log: ora.Ora; - if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start(); - for (let i = 0; i < blocks.length; i++) { - if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`; - const { height, timestamp } = blocks[i].header; - const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString()); - vaasByBlock[blockKey] = []; +// export const getMessagesFromBlockResults = async ( +// provider: Provider, +// blocks: BlockResult[], +// debug: boolean = false +// ): Promise => { +// const vaasByBlock: VaasByBlock = {}; +// let log: ora.Ora; +// if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start(); +// for (let i = 0; i < blocks.length; i++) { +// if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`; +// const { height, timestamp } = blocks[i].header; +// const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString()); +// vaasByBlock[blockKey] = []; - const chunks = []; - for (const chunk of blocks[i].chunks) { - chunks.push(await provider.chunk(chunk.chunk_hash)); - } +// const chunks = []; +// for (const chunk of blocks[i].chunks) { +// chunks.push(await provider.chunk(chunk.chunk_hash)); +// } - const transactions = chunks.flatMap(({ transactions }) => transactions); - for (const tx of transactions) { - const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core); - const logs = outcome.receipts_outcome - .filter( - ({ outcome }) => - (outcome as any).executor_id === CONTRACTS.MAINNET.near.core && - (outcome.status as ExecutionStatus).SuccessValue - ) - .flatMap(({ outcome }) => outcome.logs) - .filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat - .map((log) => JSON.parse(log.slice(11)) as EventLog) - .filter(isWormholePublishEventLog); - for (const log of logs) { - const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString()); - vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey]; - } - } - } +// const transactions = chunks.flatMap(({ transactions }) => transactions); +// for (const tx of transactions) { +// const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core); +// const logs = outcome.receipts_outcome +// .filter( +// ({ outcome }) => +// (outcome as any).executor_id === CONTRACTS.MAINNET.near.core && +// (outcome.status as ExecutionStatus).SuccessValue +// ) +// .flatMap(({ outcome }) => outcome.logs) +// .filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat +// .map((log) => JSON.parse(log.slice(11)) as EventLog) +// .filter(isWormholePublishEventLog); +// for (const log of logs) { +// const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString()); +// vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey]; +// } +// } +// } - if (debug) { - const numMessages = Object.values(vaasByBlock).flat().length; - log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`); - } +// if (debug) { +// const numMessages = Object.values(vaasByBlock).flat().length; +// log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`); +// } - return vaasByBlock; -}; +// return vaasByBlock; +// }; diff --git a/event-watcher/src/watchers/PolygonWatcher.ts b/event-watcher/src/watchers/PolygonWatcher.ts index bf7709f9d..dcc95dcea 100644 --- a/event-watcher/src/watchers/PolygonWatcher.ts +++ b/event-watcher/src/watchers/PolygonWatcher.ts @@ -7,7 +7,8 @@ export class PolygonWatcher extends EVMWatcher { constructor() { super('polygon'); } - async getFinalizedBlockNumber(): Promise { + + override async getFinalizedBlockNumber(): Promise { this.logger.info('fetching last child block from Ethereum'); const rootChain = new ethers.utils.Interface([ `function getLastChildBlock() external view returns (uint256)`, @@ -27,7 +28,7 @@ export class PolygonWatcher extends EVMWatcher { ], }, ], - AXIOS_CONFIG_JSON + AXIOS_CONFIG_JSON, ) )?.data?.[0]?.result; const block = rootChain.decodeFunctionResult('getLastChildBlock', callResult)[0].toNumber(); diff --git a/event-watcher/src/watchers/SolanaWatcher.ts b/event-watcher/src/watchers/SolanaWatcher.ts index 91ee35b1c..e28735e50 100644 --- a/event-watcher/src/watchers/SolanaWatcher.ts +++ b/event-watcher/src/watchers/SolanaWatcher.ts @@ -11,16 +11,16 @@ import { import { decode } from 'bs58'; import { z } from 'zod'; import { RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; +import { VaaLog } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; import { isLegacyMessage, normalizeCompileInstruction } from '../utils/solana'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; const WORMHOLE_PROGRAM_ID = CONTRACTS.MAINNET.solana.core; const COMMITMENT: Commitment = 'finalized'; const GET_SIGNATURES_LIMIT = 1000; -export class SolanaWatcher extends Watcher { +export class SolanaWatcher extends BaseWatcher { rpc: string; // this is set as a class field so we can modify it in tests getSignaturesLimit = GET_SIGNATURES_LIMIT; @@ -29,154 +29,158 @@ export class SolanaWatcher extends Watcher { // transactions returned. Since we don't know the number of transactions in advance, we use // a block range of 100K slots. Technically, batch size can be arbitrarily large since pagination // of the WH transactions within that range is handled internally below. - maximumBatchSize = 100_000; + override maximumBatchSize = 100_000; constructor() { super('solana'); this.rpc = RPCS_BY_CHAIN.solana!; } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { const connection = new Connection(this.rpc, COMMITMENT); return connection.getSlot(); } - async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise { - const connection = new Connection(this.rpc, COMMITMENT); - // in the rare case of maximumBatchSize skipped blocks in a row, - // you might hit this error due to the recursion below - if (fromSlot > toSlot) throw new Error('solana: invalid block range'); - this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`); - const vaasByBlock: VaasByBlock = {}; - - // identify block range by fetching signatures of the first and last transactions - // getSignaturesForAddress walks backwards so fromSignature occurs after toSignature - let toBlock: VersionedBlockResponse | null = null; - try { - toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 }); - } catch (e) { - if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { - // failed to get confirmed block: slot was skipped or missing in long-term storage - return this.getMessagesForBlocks(fromSlot, toSlot - 1); - } else { - throw e; - } - } - if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) { - return this.getMessagesForBlocks(fromSlot, toSlot - 1); - } - const fromSignature = - toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0]; + // async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise { + // const connection = new Connection(this.rpc, COMMITMENT); + // // in the rare case of maximumBatchSize skipped blocks in a row, + // // you might hit this error due to the recursion below + // if (fromSlot > toSlot) throw new Error('solana: invalid block range'); + // this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`); + // const vaasByBlock: VaasByBlock = {}; - let fromBlock: VersionedBlockResponse | null = null; - try { - fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 }); - } catch (e) { - if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { - // failed to get confirmed block: slot was skipped or missing in long-term storage - return this.getMessagesForBlocks(fromSlot + 1, toSlot); - } else { - throw e; - } - } - if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) { - return this.getMessagesForBlocks(fromSlot + 1, toSlot); - } - const toSignature = fromBlock.transactions[0].transaction.signatures[0]; - - // get all core bridge signatures between fromTransaction and toTransaction - let numSignatures = this.getSignaturesLimit; - let currSignature: string | undefined = fromSignature; - while (numSignatures === this.getSignaturesLimit) { - const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress( - new PublicKey(WORMHOLE_PROGRAM_ID), - { - before: currSignature, - until: toSignature, - limit: this.getSignaturesLimit, - } - ); + // // identify block range by fetching signatures of the first and last transactions + // // getSignaturesForAddress walks backwards so fromSignature occurs after toSignature + // let toBlock: VersionedBlockResponse | null = null; + // try { + // toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 }); + // } catch (e) { + // if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { + // // failed to get confirmed block: slot was skipped or missing in long-term storage + // return this.getMessagesForBlocks(fromSlot, toSlot - 1); + // } else { + // throw e; + // } + // } + // if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) { + // return this.getMessagesForBlocks(fromSlot, toSlot - 1); + // } + // const fromSignature = + // toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0]; - this.logger.info(`processing ${signatures.length} transactions`); - - // In order to determine if a transaction has a Wormhole message, we normalize and iterate - // through all instructions in the transaction. Only PostMessage instructions are relevant - // when looking for messages. PostMessageUnreliable instructions are ignored because there - // are no data availability guarantees (ie the associated message accounts may have been - // reused, overwriting previous data). Then, the message account is the account given by - // the second index in the instruction's account key indices. From here, we can fetch the - // message data from the account and parse out the emitter and sequence. - const results = await connection.getTransactions( - signatures.map((s) => s.signature), - { - maxSupportedTransactionVersion: 0, - } - ); - if (results.length !== signatures.length) { - throw new Error(`solana: failed to fetch tx for signatures`); - } - for (const res of results) { - if (res?.meta?.err) { - // skip errored txs - continue; - } - if (!res || !res.blockTime) { - throw new Error( - `solana: failed to fetch tx for signature ${ - res?.transaction.signatures[0] || 'unknown' - }` - ); - } - - const message = res.transaction.message; - const accountKeys = isLegacyMessage(message) - ? message.accountKeys - : message.staticAccountKeys; - const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID); - const instructions = message.compiledInstructions; - const innerInstructions = - res.meta?.innerInstructions?.flatMap((i) => - i.instructions.map(normalizeCompileInstruction) - ) || []; - const whInstructions = innerInstructions - .concat(instructions) - .filter((i) => i.programIdIndex === programIdIndex); - for (const instruction of whInstructions) { - // skip if not postMessage instruction - const instructionId = instruction.data; - if (instructionId[0] !== 0x01) continue; - - const accountId = accountKeys[instruction.accountKeyIndexes[1]]; - const { - message: { emitterAddress, sequence }, - } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT); - const blockKey = makeBlockKey( - res.slot.toString(), - new Date(res.blockTime * 1000).toISOString() - ); - const vaaKey = makeVaaKey( - res.transaction.signatures[0], - this.chain, - emitterAddress.toString('hex'), - sequence.toString() - ); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - } - - numSignatures = signatures.length; - currSignature = signatures.at(-1)?.signature; - } + // let fromBlock: VersionedBlockResponse | null = null; + // try { + // fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 }); + // } catch (e) { + // if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { + // // failed to get confirmed block: slot was skipped or missing in long-term storage + // return this.getMessagesForBlocks(fromSlot + 1, toSlot); + // } else { + // throw e; + // } + // } + // if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) { + // return this.getMessagesForBlocks(fromSlot + 1, toSlot); + // } + // const toSignature = fromBlock.transactions[0].transaction.signatures[0]; + + // // get all core bridge signatures between fromTransaction and toTransaction + // let numSignatures = this.getSignaturesLimit; + // let currSignature: string | undefined = fromSignature; + // while (numSignatures === this.getSignaturesLimit) { + // const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress( + // new PublicKey(WORMHOLE_PROGRAM_ID), + // { + // before: currSignature, + // until: toSignature, + // limit: this.getSignaturesLimit, + // } + // ); + + // this.logger.info(`processing ${signatures.length} transactions`); + + // // In order to determine if a transaction has a Wormhole message, we normalize and iterate + // // through all instructions in the transaction. Only PostMessage instructions are relevant + // // when looking for messages. PostMessageUnreliable instructions are ignored because there + // // are no data availability guarantees (ie the associated message accounts may have been + // // reused, overwriting previous data). Then, the message account is the account given by + // // the second index in the instruction's account key indices. From here, we can fetch the + // // message data from the account and parse out the emitter and sequence. + // const results = await connection.getTransactions( + // signatures.map((s) => s.signature), + // { + // maxSupportedTransactionVersion: 0, + // } + // ); + // if (results.length !== signatures.length) { + // throw new Error(`solana: failed to fetch tx for signatures`); + // } + // for (const res of results) { + // if (res?.meta?.err) { + // // skip errored txs + // continue; + // } + // if (!res || !res.blockTime) { + // throw new Error( + // `solana: failed to fetch tx for signature ${ + // res?.transaction.signatures[0] || 'unknown' + // }` + // ); + // } + + // const message = res.transaction.message; + // const accountKeys = isLegacyMessage(message) + // ? message.accountKeys + // : message.staticAccountKeys; + // const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID); + // const instructions = message.compiledInstructions; + // const innerInstructions = + // res.meta?.innerInstructions?.flatMap((i) => + // i.instructions.map(normalizeCompileInstruction) + // ) || []; + // const whInstructions = innerInstructions + // .concat(instructions) + // .filter((i) => i.programIdIndex === programIdIndex); + // for (const instruction of whInstructions) { + // // skip if not postMessage instruction + // const instructionId = instruction.data; + // if (instructionId[0] !== 0x01) continue; + + // const accountId = accountKeys[instruction.accountKeyIndexes[1]]; + // const { + // message: { emitterAddress, sequence }, + // } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT); + // const blockKey = makeBlockKey( + // res.slot.toString(), + // new Date(res.blockTime * 1000).toISOString() + // ); + // const vaaKey = makeVaaKey( + // res.transaction.signatures[0], + // this.chain, + // emitterAddress.toString('hex'), + // sequence.toString() + // ); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; + // } + // } + + // numSignatures = signatures.length; + // currSignature = signatures.at(-1)?.signature; + // } + + // // add last block for storeVaasByBlock + // const lastBlockKey = makeBlockKey( + // toSlot.toString(), + // new Date(toBlock.blockTime * 1000).toISOString() + // ); + // return { [lastBlockKey]: [], ...vaasByBlock }; + // } - // add last block for storeVaasByBlock - const lastBlockKey = makeBlockKey( - toSlot.toString(), - new Date(toBlock.blockTime * 1000).toISOString() - ); - return { [lastBlockKey]: [], ...vaasByBlock }; + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } - isValidVaaKey(key: string) { + override isValidVaaKey(key: string) { try { const [txHash, vaaKey] = key.split(':'); const txHashDecoded = Buffer.from(decode(txHash)).toString('hex'); diff --git a/event-watcher/src/watchers/SuiWatcher.ts b/event-watcher/src/watchers/SuiWatcher.ts index b4bb21008..d12e218d0 100644 --- a/event-watcher/src/watchers/SuiWatcher.ts +++ b/event-watcher/src/watchers/SuiWatcher.ts @@ -7,9 +7,9 @@ import { } from '@mysten/sui.js'; import { array } from 'superstruct'; import { RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; +import { VaaLog } from '../databases/types'; const SUI_EVENT_HANDLE = `0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a::publish_message::WormholeMessage`; @@ -22,9 +22,9 @@ type PublishMessageEvent = { timestamp: string; }; -export class SuiWatcher extends Watcher { +export class SuiWatcher extends BaseWatcher { client: JsonRpcClient; - maximumBatchSize: number = 100000; // arbitrarily large as this pages back by events + override maximumBatchSize: number = 100_000; // arbitrarily large as this pages back by events constructor() { super('sui'); @@ -32,92 +32,96 @@ export class SuiWatcher extends Watcher { } // TODO: this might break using numbers, the whole service needs a refactor to use BigInt - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { return Number( - (await this.client.request('sui_getLatestCheckpointSequenceNumber', undefined)).result + (await this.client.request('sui_getLatestCheckpointSequenceNumber', undefined)).result, ); } // TODO: this might break using numbers, the whole service needs a refactor to use BigInt - async getMessagesForBlocks(fromCheckpoint: number, toCheckpoint: number): Promise { - this.logger.info(`fetching info for checkpoints ${fromCheckpoint} to ${toCheckpoint}`); - const vaasByBlock: VaasByBlock = {}; + // async getMessagesForBlocks(fromCheckpoint: number, toCheckpoint: number): Promise { + // this.logger.info(`fetching info for checkpoints ${fromCheckpoint} to ${toCheckpoint}`); + // const vaasByBlock: VaasByBlock = {}; - { - // reserve empty slot for initial block so query is cataloged - const fromCheckpointTimestamp = new Date( - Number( - ( - await this.client.requestWithType( - 'sui_getCheckpoint', - { id: fromCheckpoint.toString() }, - Checkpoint - ) - ).timestampMs - ) - ).toISOString(); - const fromBlockKey = makeBlockKey(fromCheckpoint.toString(), fromCheckpointTimestamp); - vaasByBlock[fromBlockKey] = []; - } + // { + // // reserve empty slot for initial block so query is cataloged + // const fromCheckpointTimestamp = new Date( + // Number( + // ( + // await this.client.requestWithType( + // 'sui_getCheckpoint', + // { id: fromCheckpoint.toString() }, + // Checkpoint, + // ) + // ).timestampMs, + // ), + // ).toISOString(); + // const fromBlockKey = makeBlockKey(fromCheckpoint.toString(), fromCheckpointTimestamp); + // vaasByBlock[fromBlockKey] = []; + // } - let lastCheckpoint: null | number = null; - let cursor: any = undefined; - let hasNextPage = false; - do { - const response = await this.client.requestWithType( - 'suix_queryEvents', - { - query: { MoveEventType: SUI_EVENT_HANDLE }, - cursor, - descending_order: true, - }, - PaginatedEvents - ); - const digest = response.data.length - ? response.data[response.data.length - 1].id.txDigest - : null; - lastCheckpoint = digest - ? Number( - ( - await this.client.requestWithType( - 'sui_getTransactionBlock', - { digest }, - SuiTransactionBlockResponse - ) - ).checkpoint! - ) - : null; - cursor = response.nextCursor; - hasNextPage = response.hasNextPage; - const txBlocks = await this.client.requestWithType( - 'sui_multiGetTransactionBlocks', - { digests: response.data.map((e) => e.id.txDigest) }, - array(SuiTransactionBlockResponse) - ); - const checkpointByTxDigest = txBlocks.reduce>( - (value, { digest, checkpoint }) => { - value[digest] = checkpoint; - return value; - }, - {} - ); - for (const event of response.data) { - const checkpoint = checkpointByTxDigest[event.id.txDigest]; - if (!checkpoint) continue; - const checkpointNum = Number(checkpoint); - if (checkpointNum < fromCheckpoint || checkpointNum > toCheckpoint) continue; - const msg = event.parsedJson as PublishMessageEvent; - const timestamp = new Date(Number(msg.timestamp) * 1000).toISOString(); - const vaaKey = makeVaaKey( - event.id.txDigest, - CHAIN_ID_SUI, - msg.sender.slice(2), - msg.sequence - ); - const blockKey = makeBlockKey(checkpoint, timestamp); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - } while (hasNextPage && lastCheckpoint && fromCheckpoint < lastCheckpoint); - return vaasByBlock; + // let lastCheckpoint: null | number = null; + // let cursor: any = undefined; + // let hasNextPage = false; + // do { + // const response = await this.client.requestWithType( + // 'suix_queryEvents', + // { + // query: { MoveEventType: SUI_EVENT_HANDLE }, + // cursor, + // descending_order: true, + // }, + // PaginatedEvents, + // ); + // const digest = response.data.length + // ? response.data[response.data.length - 1].id.txDigest + // : null; + // lastCheckpoint = digest + // ? Number( + // ( + // await this.client.requestWithType( + // 'sui_getTransactionBlock', + // { digest }, + // SuiTransactionBlockResponse, + // ) + // ).checkpoint!, + // ) + // : null; + // cursor = response.nextCursor; + // hasNextPage = response.hasNextPage; + // const txBlocks = await this.client.requestWithType( + // 'sui_multiGetTransactionBlocks', + // { digests: response.data.map((e) => e.id.txDigest) }, + // array(SuiTransactionBlockResponse), + // ); + // const checkpointByTxDigest = txBlocks.reduce>( + // (value, { digest, checkpoint }) => { + // value[digest] = checkpoint; + // return value; + // }, + // {}, + // ); + // for (const event of response.data) { + // const checkpoint = checkpointByTxDigest[event.id.txDigest]; + // if (!checkpoint) continue; + // const checkpointNum = Number(checkpoint); + // if (checkpointNum < fromCheckpoint || checkpointNum > toCheckpoint) continue; + // const msg = event.parsedJson as PublishMessageEvent; + // const timestamp = new Date(Number(msg.timestamp) * 1000).toISOString(); + // const vaaKey = makeVaaKey( + // event.id.txDigest, + // CHAIN_ID_SUI, + // msg.sender.slice(2), + // msg.sequence, + // ); + // const blockKey = makeBlockKey(checkpoint, timestamp); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; + // } + // } while (hasNextPage && lastCheckpoint && fromCheckpoint < lastCheckpoint); + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } } diff --git a/event-watcher/src/watchers/TerraExplorerWatcher.ts b/event-watcher/src/watchers/TerraExplorerWatcher.ts index 0debf68c0..57fcf5cf9 100644 --- a/event-watcher/src/watchers/TerraExplorerWatcher.ts +++ b/event-watcher/src/watchers/TerraExplorerWatcher.ts @@ -1,13 +1,13 @@ import { CONTRACTS, CosmWasmChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import axios from 'axios'; import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; -import { VaasByBlock } from '../databases/types'; +import { VaaLog } from '../databases/types'; import { makeBlockKey, makeVaaKey } from '../databases/utils'; -import { Watcher } from './Watcher'; +import BaseWatcher from './BaseWatcher'; -export class TerraExplorerWatcher extends Watcher { +export class TerraExplorerWatcher extends BaseWatcher { // Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination - maximumBatchSize: number = 100000; + override maximumBatchSize: number = 100_000; latestBlockTag: string; getBlockTag: string; @@ -27,7 +27,7 @@ export class TerraExplorerWatcher extends Watcher { this.latestBlockHeight = 0; } - async getFinalizedBlockNumber(): Promise { + override async getFinalizedBlockNumber(): Promise { const result = (await axios.get(`${this.rpc}/${this.latestBlockTag}`, AXIOS_CONFIG_JSON)).data; if (result && result.block.header.height) { let blockHeight: number = parseInt(result.block.header.height); @@ -43,123 +43,127 @@ export class TerraExplorerWatcher extends Watcher { // retrieve blocks for core contract. // use "next": as the pagination key // compare block height ("height":) with what is passed in. - async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { - const address = CONTRACTS.MAINNET[this.chain].core; - if (!address) { - throw new Error(`Core contract not defined for ${this.chain}`); - } - this.logger.debug(`core contract for ${this.chain} is ${address}`); - let vaasByBlock: VaasByBlock = {}; - this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + // async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // const address = CONTRACTS.MAINNET[this.chain].core; + // if (!address) { + // throw new Error(`Core contract not defined for ${this.chain}`); + // } + // this.logger.debug(`core contract for ${this.chain} is ${address}`); + // let vaasByBlock: VaasByBlock = {}; + // this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); - const limit: number = 100; - let done: boolean = false; - let offset: number = 0; - let lastBlockInserted: number = 0; - while (!done) { - // This URL gets the paginated list of transactions for the core contract - let url: string = `${this.rpc}/${this.allTxsTag}offset=${offset}&limit=${limit}&account=${address}`; - this.logger.debug(`Query string = ${url}`); - const bulkTxnResult: BulkTxnResult = ( - await axios.get(url, { - headers: { - 'User-Agent': 'Mozilla/5.0', - 'Accept-Encoding': 'application/json', - }, - }) - ).data; - if (!bulkTxnResult) { - throw new Error('bad bulkTxnResult'); - } - offset = bulkTxnResult.next; - const bulkTxns: BulkTxn[] = bulkTxnResult.txs; - if (!bulkTxns) { - throw new Error('No transactions'); - } - for (let i: number = 0; i < bulkTxns.length; ++i) { - // Walk the transactions - const txn: BulkTxn = bulkTxns[i]; - const height: number = parseInt(txn.height); - if (height >= fromBlock && height <= toBlock) { - // We only care about the transactions in the given block range - this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`); - const blockKey = makeBlockKey(txn.height, new Date(txn.timestamp).toISOString()); - vaasByBlock[blockKey] = []; - lastBlockInserted = height; - this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`); - let vaaKey: string = ''; - // Each txn has an array of raw_logs - const rawLogs: RawLogEvents[] = JSON.parse(txn.raw_log); - for (let j: number = 0; j < rawLogs.length; ++j) { - const rawLog: RawLogEvents = rawLogs[j]; - const events: EventObjectsTypes[] = rawLog.events; - if (!events) { - this.logger.debug( - `No events in rawLog${j} for block ${height}, hash = ${txn.txhash}` - ); - continue; - } - for (let k: number = 0; k < events.length; k++) { - const event: EventObjectsTypes = events[k]; - if (event.type === 'wasm') { - if (event.attributes) { - let attrs = event.attributes; - let emitter: string = ''; - let sequence: string = ''; - let coreContract: boolean = false; - // only care about _contract_address, message.sender and message.sequence - const numAttrs = attrs.length; - for (let l = 0; l < numAttrs; l++) { - const key = attrs[l].key; - if (key === 'message.sender') { - emitter = attrs[l].value; - } else if (key === 'message.sequence') { - sequence = attrs[l].value; - } else if (key === '_contract_address' || key === 'contract_address') { - let addr = attrs[l].value; - if (addr === address) { - coreContract = true; - } - } - } - if (coreContract && emitter !== '' && sequence !== '') { - vaaKey = makeVaaKey(txn.txhash, this.chain, emitter, sequence); - this.logger.debug('blockKey: ' + blockKey); - this.logger.debug('Making vaaKey: ' + vaaKey); - vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; - } - } - } - } - } - } - if (height < fromBlock) { - this.logger.debug('Breaking out due to height < fromBlock'); - done = true; - break; - } - } - if (bulkTxns.length < limit) { - this.logger.debug('Breaking out due to ran out of txns.'); - done = true; - } - } - if (lastBlockInserted < toBlock) { - // Need to create something for the last requested block because it will - // become the new starting point for subsequent calls. - this.logger.debug(`Adding filler for block ${toBlock}`); - const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`; - const result: CosmwasmBlockResult = (await axios.get(blkUrl, AXIOS_CONFIG_JSON)).data; - if (!result) { - throw new Error(`Unable to get block information for block ${toBlock}`); - } - const blockKey = makeBlockKey( - result.block.header.height.toString(), - new Date(result.block.header.time).toISOString() - ); - vaasByBlock[blockKey] = []; - } - return vaasByBlock; + // const limit: number = 100; + // let done: boolean = false; + // let offset: number = 0; + // let lastBlockInserted: number = 0; + // while (!done) { + // // This URL gets the paginated list of transactions for the core contract + // let url: string = `${this.rpc}/${this.allTxsTag}offset=${offset}&limit=${limit}&account=${address}`; + // this.logger.debug(`Query string = ${url}`); + // const bulkTxnResult: BulkTxnResult = ( + // await axios.get(url, { + // headers: { + // 'User-Agent': 'Mozilla/5.0', + // 'Accept-Encoding': 'application/json', + // }, + // }) + // ).data; + // if (!bulkTxnResult) { + // throw new Error('bad bulkTxnResult'); + // } + // offset = bulkTxnResult.next; + // const bulkTxns: BulkTxn[] = bulkTxnResult.txs; + // if (!bulkTxns) { + // throw new Error('No transactions'); + // } + // for (let i: number = 0; i < bulkTxns.length; ++i) { + // // Walk the transactions + // const txn: BulkTxn = bulkTxns[i]; + // const height: number = parseInt(txn.height); + // if (height >= fromBlock && height <= toBlock) { + // // We only care about the transactions in the given block range + // this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`); + // const blockKey = makeBlockKey(txn.height, new Date(txn.timestamp).toISOString()); + // vaasByBlock[blockKey] = []; + // lastBlockInserted = height; + // this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`); + // let vaaKey: string = ''; + // // Each txn has an array of raw_logs + // const rawLogs: RawLogEvents[] = JSON.parse(txn.raw_log); + // for (let j: number = 0; j < rawLogs.length; ++j) { + // const rawLog: RawLogEvents = rawLogs[j]; + // const events: EventObjectsTypes[] = rawLog.events; + // if (!events) { + // this.logger.debug( + // `No events in rawLog${j} for block ${height}, hash = ${txn.txhash}`, + // ); + // continue; + // } + // for (let k: number = 0; k < events.length; k++) { + // const event: EventObjectsTypes = events[k]; + // if (event.type === 'wasm') { + // if (event.attributes) { + // let attrs = event.attributes; + // let emitter: string = ''; + // let sequence: string = ''; + // let coreContract: boolean = false; + // // only care about _contract_address, message.sender and message.sequence + // const numAttrs = attrs.length; + // for (let l = 0; l < numAttrs; l++) { + // const key = attrs[l].key; + // if (key === 'message.sender') { + // emitter = attrs[l].value; + // } else if (key === 'message.sequence') { + // sequence = attrs[l].value; + // } else if (key === '_contract_address' || key === 'contract_address') { + // let addr = attrs[l].value; + // if (addr === address) { + // coreContract = true; + // } + // } + // } + // if (coreContract && emitter !== '' && sequence !== '') { + // vaaKey = makeVaaKey(txn.txhash, this.chain, emitter, sequence); + // this.logger.debug('blockKey: ' + blockKey); + // this.logger.debug('Making vaaKey: ' + vaaKey); + // vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; + // } + // } + // } + // } + // } + // } + // if (height < fromBlock) { + // this.logger.debug('Breaking out due to height < fromBlock'); + // done = true; + // break; + // } + // } + // if (bulkTxns.length < limit) { + // this.logger.debug('Breaking out due to ran out of txns.'); + // done = true; + // } + // } + // if (lastBlockInserted < toBlock) { + // // Need to create something for the last requested block because it will + // // become the new starting point for subsequent calls. + // this.logger.debug(`Adding filler for block ${toBlock}`); + // const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`; + // const result: CosmwasmBlockResult = (await axios.get(blkUrl, AXIOS_CONFIG_JSON)).data; + // if (!result) { + // throw new Error(`Unable to get block information for block ${toBlock}`); + // } + // const blockKey = makeBlockKey( + // result.block.header.height.toString(), + // new Date(result.block.header.time).toISOString(), + // ); + // vaasByBlock[blockKey] = []; + // } + // return vaasByBlock; + // } + + override getVaaLogs(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); } } diff --git a/event-watcher/src/watchers/types.ts b/event-watcher/src/watchers/types.ts new file mode 100644 index 000000000..28f61d36e --- /dev/null +++ b/event-watcher/src/watchers/types.ts @@ -0,0 +1,39 @@ +import { ChainName } from '@certusone/wormhole-sdk'; +import BaseDB from '../databases/BaseDB'; +import { VaaLog } from '../databases/types'; +import BaseSNS from '../services/SNS/BaseSNS'; +import { WormholeLogger } from '../utils/logger'; +import { AlgorandWatcher } from './AlgorandWatcher'; +import { AptosWatcher } from './AptosWatcher'; +import { BSCWatcher } from './BSCWatcher'; +import { CosmwasmWatcher } from './CosmwasmWatcher'; +import { EVMWatcher } from './EVMWatcher'; +import { InjectiveExplorerWatcher } from './InjectiveExplorerWatcher'; +import { NearWatcher } from './NearWatcher'; +import { SolanaWatcher } from './SolanaWatcher'; +import { SuiWatcher } from './SuiWatcher'; +import { TerraExplorerWatcher } from './TerraExplorerWatcher'; + +export type WatcherOptionTypes = + | SolanaWatcher + | EVMWatcher + | BSCWatcher + | AlgorandWatcher + | AptosWatcher + | NearWatcher + | InjectiveExplorerWatcher + | TerraExplorerWatcher + | CosmwasmWatcher + | SuiWatcher; +export interface WatcherImplementation { + chain: ChainName; + logger: WormholeLogger; + maximumBatchSize: number; + sns?: BaseSNS | null; + db?: BaseDB; + getFinalizedBlockNumber(): Promise; + getVaaLogs(fromBlock: number, toBlock: number): Promise; + isValidBlockKey(key: string): boolean; + isValidVaaKey(key: string): boolean; + watch(): Promise; +} diff --git a/event-watcher/src/watchers/utils.ts b/event-watcher/src/watchers/utils.ts index 8dff53ac7..326a049ac 100644 --- a/event-watcher/src/watchers/utils.ts +++ b/event-watcher/src/watchers/utils.ts @@ -1,4 +1,8 @@ -import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import { + ChainName, + CosmWasmChainName, + EVMChainName, +} from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { AlgorandWatcher } from './AlgorandWatcher'; import { AptosWatcher } from './AptosWatcher'; import { ArbitrumWatcher } from './ArbitrumWatcher'; @@ -11,10 +15,10 @@ import { NearWatcher } from './NearWatcher'; import { PolygonWatcher } from './PolygonWatcher'; import { SolanaWatcher } from './SolanaWatcher'; import { TerraExplorerWatcher } from './TerraExplorerWatcher'; -import { Watcher } from './Watcher'; import { SuiWatcher } from './SuiWatcher'; +import { WatcherOptionTypes } from './types'; -export function makeFinalizedWatcher(chainName: ChainName): Watcher { +export function makeFinalizedWatcher(chainName: ChainName): WatcherOptionTypes { if (chainName === 'solana') { return new SolanaWatcher(); } else if (['ethereum', 'karura', 'acala'].includes(chainName)) { @@ -41,8 +45,8 @@ export function makeFinalizedWatcher(chainName: ChainName): Watcher { return new InjectiveExplorerWatcher(); } else if (chainName === 'terra') { return new TerraExplorerWatcher('terra'); - } else if (chainName === 'terra2' || chainName === 'xpla') { - return new CosmwasmWatcher(chainName); + } else if (['terra2', 'xpla'].includes(chainName)) { + return new CosmwasmWatcher(chainName as CosmWasmChainName); } else if (chainName === 'sui') { return new SuiWatcher(); } else { diff --git a/event-watcher/tsconfig.json b/event-watcher/tsconfig.json index cc5d7cd9f..6b92e265d 100644 --- a/event-watcher/tsconfig.json +++ b/event-watcher/tsconfig.json @@ -15,6 +15,7 @@ "noFallthroughCasesInSwitch": true, "resolveJsonModule": true, "noEmit": true, + "noImplicitOverride": true, "lib": ["es2022"] }, "include": ["scripts", "src", "src/abi/*.json"]