Skip to content

Commit

Permalink
Refactor/event watcher (#647)
Browse files Browse the repository at this point in the history
* feat: add EVM event-watcher support with mongodb

* refactor: DB, Watcher, SNS Classes
  • Loading branch information
raop155 committed Aug 22, 2023
1 parent 3fa1804 commit bf02b20
Show file tree
Hide file tree
Showing 37 changed files with 1,205 additions and 1,223 deletions.
21 changes: 17 additions & 4 deletions event-watcher/.env.sample
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
LOG_DIR=.
#Log
LOG_DIR=
LOG_LEVEL=info

ETH_RPC=
#RPC
ETH_RPC=https://eth.llamarpc.com

#Database source
DB_SOURCE=
JSON_DB_FILE=db.json
JSON_LAST_BLOCK_FILE=lastBlockByChain.json

#Server
PORT=3005

#MongoDB
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=wormhole

#SNS source
SNS_SOURCE=aws

#AWS
AWS_SNS_REGION=
AWS_TOPIC_ARN=
AWS_SNS_TOPIC_ARN=
AWS_SNS_SUBJECT=EventWatcher
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_SECRET_ACCESS_KEY=
6 changes: 3 additions & 3 deletions event-watcher/scripts/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ dotenv.config();
import { ChainId, coalesceChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { chunkArray, sleep } from '../src/common';
import { BigtableDatabase } from '../src/databases/BigtableDatabase';
import { JsonDatabase } from '../src/databases/JsonDatabase';
import { VaasByBlock } from '../src/databases/types';
import JsonDB from '../src/databases/JsonDB';

// This script backfills the bigtable db from a json db

(async () => {
const localDb = new JsonDatabase();
const localDb = new JsonDB();
const remoteDb = new BigtableDatabase();

const dbEntries = Object.entries(localDb.db);
Expand All @@ -25,7 +25,7 @@ import { VaasByBlock } from '../src/databases/types';
}, {});
await remoteDb.storeVaasByBlock(
coalesceChainName(Number(chain) as ChainId),
chunkedVaasByBlock
chunkedVaasByBlock,
);
await sleep(500);
}
Expand Down
6 changes: 3 additions & 3 deletions event-watcher/scripts/backfillArbitrum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ dotenv.config();
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import ora from 'ora';
import { initDb } from '../src/databases/utils';
import { getDB } from '../src/databases/utils';
import { AXIOS_CONFIG_JSON } from '../src/consts';
import { ArbitrumWatcher } from '../src/watchers/ArbitrumWatcher';
import { LOG_MESSAGE_PUBLISHED_TOPIC } from '../src/watchers/EVMWatcher';

// This script exists because the Arbitrum RPC node only supports a 10 block range which is super slow

(async () => {
const db = initDb();
const db = getDB();
const chain: ChainName = 'arbitrum';
const endpoint = `https://api.arbiscan.io/api?module=logs&action=getLogs&address=${CONTRACTS.MAINNET.arbitrum.core}&topic0=${LOG_MESSAGE_PUBLISHED_TOPIC}&apikey=YourApiKeyToken`;

// fetch all message publish logs for core bridge contract from explorer
let log = ora('Fetching logs from Arbiscan...').start();
const blockNumbers = (await axios.get(endpoint, AXIOS_CONFIG_JSON)).data.result.map((x: any) =>
parseInt(x.blockNumber, 16)
parseInt(x.blockNumber, 16),
);
log.succeed(`Fetched ${blockNumbers.length} logs from Arbiscan`);
// use the watcher to fetch corresponding blocks
Expand Down
8 changes: 4 additions & 4 deletions event-watcher/scripts/backfillNear.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/cons
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../src/common';
import { BlockResult } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { initDb } from '../src/databases/utils';
import { getDB } from '../src/databases/utils';
import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../src/utils/near';
import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';

Expand All @@ -19,11 +19,11 @@ import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';
const BATCH_SIZE = 1000;

(async () => {
const db = initDb();
const db = getDB();
const chain: ChainName = 'near';
const provider = await getNearProvider(NEAR_ARCHIVE_RPC);
const fromBlock = Number(
(await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0
(await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0,
);

// fetch all transactions for core bridge contract from explorer
Expand All @@ -32,7 +32,7 @@ const BATCH_SIZE = 1000;
const transactions = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
BATCH_SIZE,
toBlock.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds
toBlock.header.timestamp.toString().padEnd(19, '9'), // pad to nanoseconds
);
log.succeed(`Fetched ${transactions.length} transactions from NEAR Explorer`);

Expand Down
59 changes: 59 additions & 0 deletions event-watcher/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';

export const env = {
NODE_ENV: process.env.NODE_ENV,

LOG_DIR: process.env.LOG_DIR,
LOG_LEVEL: process.env.LOG_LEVEL || 'info',

ETH_RPC: process.env.ETH_RPC,

DB_SOURCE: process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local',
JSON_DB_FILE: process.env.JSON_DB_FILE || './db.json',
JSON_LAST_BLOCK_FILE: process.env.JSON_LAST_BLOCK_FILE || './lastBlockByChain.json',

PORT: process.env.PORT,

MONGODB_URI: process.env.MONGODB_URI,
MONGODB_DATABASE: process.env.MONGODB_DATABASE,

SNS_SOURCE: process.env.SNS_SOURCE,
AWS_SNS_REGION: process.env.AWS_SNS_REGION,
AWS_SNS_TOPIC_ARN: process.env.AWS_SNS_TOPIC_ARN,
AWS_SNS_SUBJECT: process.env.AWS_SNS_SUBJECT,
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
} as const;

// EVM Chains not supported
// aurora, gnosis, neon, sepolia

export const evmChains: EVMChainName[] = [
'acala',
'arbitrum',
'avalanche',
'base',
'bsc',
'celo',
'ethereum',
'fantom',
'karura',
'klaytn',
'moonbeam',
'oasis',
'optimism',
'polygon',
];

export const supportedChains: ChainName[] = [
...evmChains,
'algorand',
'aptos',
'injective',
'near',
'solana',
'sui',
'terra',
'terra2',
'xpla',
];
41 changes: 18 additions & 23 deletions event-watcher/src/consts.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { AxiosRequestConfig } from 'axios';
import { env } from './config';

export const TIMEOUT = 0.5 * 1000;

Expand All @@ -25,37 +26,36 @@ export const TIMEOUT = 0.5 * 1000;
// This node didn't work: 'https://arb1.arbitrum.io/rpc',

export const RPCS_BY_CHAIN: { [key in ChainName]?: string } = {
ethereum: process.env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native',
bsc: 'https://bsc-dataseed2.defibit.io',
polygon: 'https://rpc.ankr.com/polygon',
avalanche: 'https://rpc.ankr.com/avalanche',
oasis: 'https://emerald.oasis.dev',
acala: 'https://eth-rpc-acala.aca-api.network',
algorand: 'https://mainnet-api.algonode.cloud',
aptos: 'https://fullnode.mainnet.aptoslabs.com/',
arbitrum: 'https://arb1.arbitrum.io/rpc',
avalanche: 'https://rpc.ankr.com/avalanche',
base: 'https://developer-access-mainnet.base.org',
bsc: 'https://bsc-dataseed2.defibit.io',
celo: 'https://forno.celo.org',
ethereum: env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native',
fantom: 'https://rpc.ankr.com/fantom',
injective: 'https://api.injective.network',
karura: 'https://eth-rpc-karura.aca-api.network',
acala: 'https://eth-rpc-acala.aca-api.network',
klaytn: 'https://klaytn-mainnet-rpc.allthatnode.com:8551',
celo: 'https://forno.celo.org',
moonbeam: 'https://rpc.ankr.com/moonbeam',
arbitrum: 'https://arb1.arbitrum.io/rpc',
optimism: 'https://rpc.ankr.com/optimism',
aptos: 'https://fullnode.mainnet.aptoslabs.com/',
near: 'https://rpc.mainnet.near.org',
xpla: 'https://dimension-lcd.xpla.dev',
terra2: 'https://phoenix-lcd.terra.dev',
// terra: 'https://columbus-fcd.terra.dev',
terra: 'https://terra-classic-fcd.publicnode.com',
injective: 'https://api.injective.network',
solana: process.env.SOLANA_RPC ?? 'https://api.mainnet-beta.solana.com',
oasis: 'https://emerald.oasis.dev',
optimism: 'https://rpc.ankr.com/optimism',
polygon: 'https://rpc.ankr.com/polygon',
solana: 'https://api.mainnet-beta.solana.com',
sui: 'https://rpc.mainnet.sui.io',
base: 'https://developer-access-mainnet.base.org',
terra: 'https://terra-classic-fcd.publicnode.com', // 'https://columbus-fcd.terra.dev',
terra2: 'https://phoenix-lcd.terra.dev',
xpla: 'https://dimension-lcd.xpla.dev',
};

// Separating for now so if we max out infura we can keep Polygon going
export const POLYGON_ROOT_CHAIN_RPC = 'https://rpc.ankr.com/eth';
export const POLYGON_ROOT_CHAIN_ADDRESS = '0x86E4Dc95c7FBdBf52e33D563BbDB00823894C287';
// Optimism watcher relies on finalized calls which don't work right on Ankr
export const OPTIMISM_CTC_CHAIN_RPC = process.env.ETH_RPC;
export const OPTIMISM_CTC_CHAIN_RPC = env.ETH_RPC;
export const OPTIMISM_CTC_CHAIN_ADDRESS = '0x5E4e65926BA27467555EB562121fac00D24E9dD2';

export const ALGORAND_INFO = {
Expand All @@ -68,11 +68,6 @@ export const ALGORAND_INFO = {
token: '',
};

export const DB_SOURCE =
process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local';
export const JSON_DB_FILE = process.env.JSON_DB_FILE || './db.json';
export const DB_LAST_BLOCK_FILE = process.env.DB_LAST_BLOCK_FILE || './lastBlockByChain.json';

// without this, axios request will error `Z_BUF_ERROR`: https://github.com/axios/axios/issues/5346
export const AXIOS_CONFIG_JSON: AxiosRequestConfig = {
headers: {
Expand Down
47 changes: 47 additions & 0 deletions event-watcher/src/databases/BaseDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../common/consts';
import { getLogger, WormholeLogger } from '../utils/logger';
import { DBImplementation, LastBlockByChain, VaaLog } from './types';
abstract class BaseDB implements DBImplementation {
public logger: WormholeLogger;
public lastBlockByChain: LastBlockByChain = {};

constructor() {
this.logger = getLogger('db');
this.lastBlockByChain = {};
}

public async start(): Promise<void> {
await this.connect();
await this.getLastBlocksProcessed();
console.log('----------DB CONFIGURED------------');
}

public async getResumeBlockByChain(chain: ChainName): Promise<number | null> {
const lastBlock = this.getLastBlockByChain(chain);
const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain];

if (lastBlock) return Number(lastBlock) + 1;
if (initialBlock) return Number(initialBlock);
return null;
}

public getLastBlockByChain(chain: ChainName): string | null {
const chainId = coalesceChainId(chain);
const blockInfo = this.lastBlockByChain?.[chainId];

if (blockInfo) {
const tokens = String(blockInfo)?.split('/');
return chain === 'aptos' ? tokens.at(-1)! : tokens[0];
}

return null;
}

abstract connect(): Promise<void>;
abstract getLastBlocksProcessed(): Promise<void>;
abstract storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void>;
abstract storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
}

export default BaseDB;
33 changes: 0 additions & 33 deletions event-watcher/src/databases/Database.ts

This file was deleted.

54 changes: 54 additions & 0 deletions event-watcher/src/databases/JsonDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { readFileSync, writeFileSync } from 'fs';
import { env } from '../config';
import BaseDB from './BaseDB';
import { VaaLog } from './types';

const ENCODING = 'utf8';

export default class JsonDB extends BaseDB {
db: {} | null = null;
dbFile: string;
dbLastBlockFile: string;

constructor() {
super();
this.db = {};
this.lastBlockByChain = {};
this.dbFile = env.JSON_DB_FILE;
this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE;
}

async connect(): Promise<void> {
try {
const rawDb = readFileSync(this.dbFile, ENCODING);
this.db = rawDb ? JSON.parse(rawDb) : {};
console.log('---CONNECTED TO JsonDB---');
} catch (e) {
this.logger.warn(`${this.dbFile} does not exists, creating new file`);
this.db = {};
}
}

async getLastBlocksProcessed(): Promise<void> {
try {
const rawLastBlockByChain = readFileSync(this.dbLastBlockFile, ENCODING);
this.lastBlockByChain = rawLastBlockByChain ? JSON.parse(rawLastBlockByChain) : {};
} catch (e) {
this.logger.warn(`${this.dbLastBlockFile} does not exists, creating new file`);
this.lastBlockByChain = {};
}
}

override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
this.db = [{ ...this.db, ...vaaLogs }];
writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING);
}

override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
const chainId = coalesceChainId(chain);
this.lastBlockByChain[chainId] = String(lastBlock);

writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain, null, 2), ENCODING);
}
}
Loading

0 comments on commit bf02b20

Please sign in to comment.