Skip to content

Commit

Permalink
fix: persist behavior when finding redeemed events (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
raop155 committed Oct 6, 2023
1 parent bc8a311 commit a05dad5
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 79 deletions.
2 changes: 1 addition & 1 deletion event-watcher/src/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const RPCS_BY_CHAIN_MAINNET: { [key in ChainName]?: string } = {
aptos: env.APTOS_RPC || 'https://fullnode.mainnet.aptoslabs.com',
arbitrum: env.ARBITRUM_RPC || 'https://arb1.arbitrum.io/rpc',
avalanche: env.AVALANCHE_RPC || 'https://rpc.ankr.com/avalanche',
base: env.BASE_RPC || 'https://developer-access-mainnet.base.org',
base: env.BASE_RPC || 'https://rpc.ankr.com/base',
bsc: env.BSC_RPC || 'https://rpc.ankr.com/bsc_testnet_chapel',
celo: env.CELO_RPC || 'https://forno.celo.org',
ethereum: env.ETHEREUM_RPC || 'https://eth.llamarpc.com', // 'https://svc.blockdaemon.com/ethereum/mainnet/native',
Expand Down
109 changes: 43 additions & 66 deletions event-watcher/src/databases/MongoDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,28 @@ export default class MongoDB extends BaseDB {
override async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
try {
for (let i = 0; i < whTxs.length; i++) {
let message = 'Insert Wormhole Transaction Event Log to MongoDB collection';
let message = `Insert Wormhole Transaction Event Log to ${WORMHOLE_TX_COLLECTION} collection`;
const currentWhTx = whTxs[i];
const { id, ...rest } = currentWhTx;

// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
const whTxDocument = await this.wormholeTxCollection?.findOne({ _id: id });
const whTxDocument = await this.wormholeTxCollection?.findOne({
_id: id as unknown as mongoDB.ObjectId,
});

if (whTxDocument) {
await this.wormholeTxCollection?.findOneAndUpdate(
{ _id: id as unknown as mongoDB.ObjectId },
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
},
{
$set: {
'eventLog.updatedAt': new Date(),
},
$inc: {
'eventLog.revision': 1,
},
},
{
returnDocument: 'after',
$set: { 'eventLog.updatedAt': new Date() },
$inc: { 'eventLog.revision': 1 },
},
{ returnDocument: 'after' },
);

message = 'Update Wormhole Transaction Event Log to MongoDB collection';
message = `Update Wormhole Transaction Event Log to ${WORMHOLE_TX_COLLECTION} collection`;
} else {
await this.wormholeTxCollection?.insertOne({
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
_id: id as unknown as mongoDB.ObjectId,
...rest,
});
}
Expand Down Expand Up @@ -123,64 +114,55 @@ export default class MongoDB extends BaseDB {
): Promise<void> {
try {
for (let i = 0; i < redeemedTxs.length; i++) {
const message = 'Update Wormhole Transfer Redeemed Event Log to MongoDB collection';
let message = `Insert Wormhole Transfer Redeemed Event Log to ${GLOBAL_TX_COLLECTION} collection`;
const currentWhRedeemedTx = redeemedTxs[i];
const { id, destinationTx } = currentWhRedeemedTx;
const { method, status } = destinationTx;
const { id, destinationTx, ...rest } = currentWhRedeemedTx;
const { status } = destinationTx;

const whTxResponse = await this.wormholeTxCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
},
{ _id: id as unknown as mongoDB.ObjectId },
{
$set: {
'eventLog.updatedAt': new Date(),
status: status,
},
$inc: {
'eventLog.revision': 1,
},
},
{
returnDocument: 'after',
$inc: { 'eventLog.revision': 1 },
},
{ returnDocument: 'after' },
);

if (!whTxResponse?.value) {
this.logger.error(
this.logger.info(
`Error Update Wormhole Transfer Redeemed Event Log: ${id} does not exist on ${WORMHOLE_TX_COLLECTION} collection`,
);

return;
}

const globalTxResponse = await this.globalTxCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
},
{
$set: {
'destinationTx.method': method,
'destinationTx.status': status,
'destinationTx.updatedAt': new Date(),
},
$inc: {
revision: 1,
},
},
{
returnDocument: 'after',
},
);
const globalTxDocument = await this.globalTxCollection?.findOne({
_id: id as unknown as mongoDB.ObjectId,
});

if (!globalTxResponse?.value) {
this.logger.error(
`Error Update Wormhole Transfer Redeemed Event Log: ${id} does not exist on ${GLOBAL_TX_COLLECTION} collection`,
);
if (globalTxDocument) {
message = `Update Wormhole Transfer Redeemed Event Log to ${GLOBAL_TX_COLLECTION} collection`;
const { destinationTx: globalTxDocumentDestinationTx } = globalTxDocument;

return;
if (!globalTxDocumentDestinationTx) {
await this.globalTxCollection?.findOneAndUpdate(
{ _id: id as unknown as mongoDB.ObjectId },
{
$set: { destinationTx },
$inc: { revision: 1 },
},
{ returnDocument: 'after' },
);
} else {
message = `Already exists Wormhole Transfer Redeemed Event Log on ${GLOBAL_TX_COLLECTION} collection`;
}
} else {
await this.globalTxCollection?.insertOne({
_id: id as unknown as mongoDB.ObjectId,
destinationTx,
...rest,
});
}

if (currentWhRedeemedTx) {
Expand All @@ -205,10 +187,7 @@ export default class MongoDB extends BaseDB {

try {
await this.lastTxBlockByChainCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: chain,
},
{ _id: chain as unknown as mongoDB.ObjectId },
{
$setOnInsert: {
chainId,
Expand All @@ -219,9 +198,7 @@ export default class MongoDB extends BaseDB {
updatedAt: new Date(),
},
},
{
upsert: true,
},
{ upsert: true },
);
} catch (e: unknown) {
this.logger.error(`Error while storing latest processed block: ${e}`);
Expand Down
5 changes: 5 additions & 0 deletions event-watcher/src/databases/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ export type WHTransferRedeemed = {
id: string;
destinationTx: {
chainId: number;
txHash: string;
status: string;
method: string;
from: string;
to: string;
blockNumber: string;
timestamp: Date;
updatedAt: Date;
};
indexedAt: Date | string | number;
revision: number;
};
23 changes: 23 additions & 0 deletions event-watcher/src/databases/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,47 @@ export const makeWHRedeemedTransaction = async ({
emitterChainId,
emitterAddress,
sequence,
txHash,
from,
to,
blockNumber,
indexedAt,
}: {
txHash: string;
emitterChainId: number;
emitterAddress: string;
sequence: number;
from: string;
to: string;
blockNumber: string;
indexedAt: Date | number | string;
}): Promise<WHTransferRedeemed> => {
const vaaId = `${emitterChainId}/${emitterAddress}/${sequence}`;
const REDEEMED_TX_STATUS = 'completed';
const REDEEMED_TX_METHOD = 'event-watcher-redeemed';

let parsedIndexedAt = indexedAt;

if (!(parsedIndexedAt instanceof Date)) {
if (!checkIfDateIsInMilliseconds(parsedIndexedAt)) {
parsedIndexedAt = new Date(+parsedIndexedAt * 1000) as unknown as number;
}
}

return {
id: vaaId,
destinationTx: {
chainId: emitterChainId,
txHash,
status: REDEEMED_TX_STATUS,
method: REDEEMED_TX_METHOD,
from,
to,
blockNumber, // hex string(16)
timestamp: new Date(),
updatedAt: new Date(),
},
indexedAt: parsedIndexedAt,
revision: 1,
};
};
2 changes: 1 addition & 1 deletion event-watcher/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { logInfo } from './utils/logger';
import { ChainName } from '@certusone/wormhole-sdk';
import { NETWORK } from './consts';

const version = '1.1.0';
const version = '1.2.0';
class EventWatcher {
private watchers: WatcherOptionTypes[] = [];

Expand Down
54 changes: 43 additions & 11 deletions event-watcher/src/watchers/EVMWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,41 @@ export class EVMWatcher extends BaseWatcher {
return vaasByBlock;
}

override async getWhTxs(fromBlock: number, toBlock: number): Promise<WHTransaction[]> {
const whTxs: WHTransaction[] = [];
const address = NETWORK_CONTRACTS[this.chain].core;

if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
override async getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }> {
const whEvents: { whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] } = {
whTxs: [],
redeemedTxs: [],
};

// We collect the blocks data here to avoid making multiple requests to the RPC
const blocks = await this.getBlocks(fromBlock, toBlock);
const timestampsByBlock = [];
for (const block of blocks) {
const timestamp = new Date(block.timestamp);
const timestamp = new Date(block.timestamp * 1000);
timestampsByBlock[block.number] = timestamp;
}

whEvents.whTxs = await this.getWhTxs(fromBlock, toBlock, timestampsByBlock);
whEvents.redeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock, timestampsByBlock);

return whEvents;
}

override async getWhTxs(
fromBlock: number,
toBlock: number,
timestampsByBlock?: Record<number, Date>,
): Promise<WHTransaction[]> {
const whTxs: WHTransaction[] = [];
const address = NETWORK_CONTRACTS[this.chain].core;

if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}

const txLogs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);

this.logger.debug(`processing ${txLogs.length} txLogs`);
Expand All @@ -275,7 +295,7 @@ export class EVMWatcher extends BaseWatcher {
const parseSequence = Number(sequence.toString());
const txHash = txLog.transactionHash;
const parsePayload = Buffer.from(payload).toString().slice(2);
const timestamp = timestampsByBlock[blockNumber];
const timestamp = timestampsByBlock![blockNumber];

const vaaSerialized = await makeSerializedVAA({
timestamp,
Expand Down Expand Up @@ -307,7 +327,11 @@ export class EVMWatcher extends BaseWatcher {
return whTxs;
}

override async getRedeemedTxs(fromBlock: number, toBlock: number): Promise<WHTransferRedeemed[]> {
override async getRedeemedTxs(
fromBlock: number,
toBlock: number,
timestampsByBlock?: Record<number, Date>,
): Promise<WHTransferRedeemed[]> {
const redeemedTxs: WHTransferRedeemed[] = [];
const tokenBridgeAddress = NETWORK_CONTRACTS[this.chain].token_bridge;

Expand All @@ -321,17 +345,25 @@ export class EVMWatcher extends BaseWatcher {

this.logger.debug(`processing ${transferRedeemedLogs.length} transferRedeemedLogs`);
for (const transferRedeemedLog of transferRedeemedLogs) {
const [, emitterChainId, emitterAddress, sequence] = transferRedeemedLog?.topics || [];
const { blockNumber, transactionHash, topics } = transferRedeemedLog;
const [, emitterChainId, emitterAddress, sequence] = topics || [];

if (emitterChainId && emitterAddress && sequence) {
const parsedEmitterChainId = Number(emitterChainId.toString());
const parsedEmitterAddress = emitterAddress.slice(2);
const parsedSequence = Number(sequence.toString());
const parsedBlockNumber = Number(blockNumber).toString(16);
const indexedAt = timestampsByBlock![blockNumber];

const redeemedTx = await makeWHRedeemedTransaction({
emitterChainId: parsedEmitterChainId,
emitterAddress: parsedEmitterAddress,
sequence: parsedSequence,
blockNumber: parsedBlockNumber,
txHash: transactionHash,
indexedAt,
from: '',
to: '',
});

redeemedTxs.push(redeemedTx);
Expand Down

0 comments on commit a05dad5

Please sign in to comment.