Skip to content

Commit

Permalink
feat: Implement 'toIncentivesAddress' query on LZ collector
Browse files Browse the repository at this point in the history
  • Loading branch information
jsanmigimeno committed Jul 19, 2024
1 parent d86a98a commit df046aa
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 109 deletions.
39 changes: 36 additions & 3 deletions src/collector/layer-zero/layer-zero.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import {
MonitorStatus,
} from '../../monitor/monitor.interface';
import { ReceiveULN302__factory } from 'src/contracts/factories/ReceiveULN302__factory';
import { wait, tryErrorToString, paddedTo0xAddress } from 'src/common/utils';
import { wait, tryErrorToString, paddedTo0xAddress, defaultAbiCoder, getDestinationImplementation } from 'src/common/utils';
import {
ReceiveULN302,
ReceiveULN302Interface,
UlnConfigStruct,
UlnConfigStructOutput,
} from 'src/contracts/ReceiveULN302';
import { AMBMessage, AMBProof } from 'src/store/store.types';
import { LayerZeroEnpointV2__factory } from 'src/contracts';
import { IncentivizedMessageEscrow, IncentivizedMessageEscrow__factory, LayerZeroEnpointV2__factory } from 'src/contracts';
import { Resolver, loadResolver } from 'src/resolvers/resolver';
import { ParsePayload } from 'src/payload/decode.payload';
import { LayerZeroEnpointV2Interface, PacketSentEvent } from 'src/contracts/LayerZeroEnpointV2';
Expand All @@ -63,10 +63,12 @@ class LayerZeroWorker {
private readonly receiveULN302: ReceiveULN302;
private readonly receiveULN302Interface: ReceiveULN302Interface;
private readonly receiverAddress: string;
private readonly messageEscrowContract: IncentivizedMessageEscrow;
private readonly resolver: Resolver;
private readonly filterTopics: string[][];
private readonly layerZeroChainIdMap: Record<string, string>;
private readonly incentivesAddresses: Record<string, string>;
private readonly destinationImplementationCache: Record<string, Record<string, string>> = {}; // Map fromApplication + toChainId => destinationImplementation
private currentStatus: MonitorStatus | null = null;
private monitor: MonitorInterface;

Expand Down Expand Up @@ -98,6 +100,12 @@ class LayerZeroWorker {
[this.layerZeroEnpointV2Interface.getEvent('PacketSent').topicHash,
this.receiveULN302Interface.getEvent('PayloadVerified').topicHash,]
];

this.messageEscrowContract = this.initializeMessageEscrowContract(
this.config.incentivesAddress,
this.provider,
);

this.monitor = this.startListeningToMonitor(this.config.monitorPort);

this.initiateIntervalStatusLog();
Expand Down Expand Up @@ -145,6 +153,16 @@ class LayerZeroWorker {
return loadResolver(resolver, provider, logger);
}

private initializeMessageEscrowContract(
incentivesAddress: string,
provider: JsonRpcProvider,
): IncentivizedMessageEscrow {
return IncentivizedMessageEscrow__factory.connect(
incentivesAddress,
provider,
);
}

/**
* Starts listening to the monitor.
*
Expand Down Expand Up @@ -396,6 +414,21 @@ class LayerZeroWorker {
const transactionBlockNumber =
await this.resolver.getTransactionBlockNumber(log.blockNumber);


const channelId = defaultAbiCoder.encode(
['uint256'],
[packet.dstEid],
);

const toIncentivesAddress = await getDestinationImplementation(
decodedMessage.sourceApplicationAddress,
channelId,
this.messageEscrowContract,
this.destinationImplementationCache,
this.logger,
this.config.retryInterval
);

const messageIdentifier = '0x' + decodedMessage.messageIdentifier;
const ambMessage: AMBMessage = {
messageIdentifier,
Expand All @@ -404,7 +437,7 @@ class LayerZeroWorker {
fromChainId: srcEidMapped.toString(),
toChainId: dstEidMapped.toString(),
fromIncentivesAddress: '0x' + packet.sender.slice(24), // Keep only the relevant bytes (i.e. discard the first 12 bytes)
// toIncentivesAddress: , //TODO
toIncentivesAddress,

incentivesPayload: '0x' + packet.message,

Expand Down
4 changes: 2 additions & 2 deletions src/collector/wormhole/wormhole-message-sniffer.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
} from 'src/contracts';
import { Store } from 'src/store/store.lib';
import { workerData, MessagePort } from 'worker_threads';
import { tryErrorToString, wait } from '../../common/utils';
import { decodeWormholeMessage, defaultAbiCoder, getDestinationImplementation } from './wormhole.utils';
import { defaultAbiCoder, getDestinationImplementation, tryErrorToString, wait } from '../../common/utils';
import { decodeWormholeMessage } from './wormhole.utils';
import { ParsePayload } from 'src/payload/decode.payload';
import { WormholeMessageSnifferWorkerData } from './wormhole.types';
import { JsonRpcProvider } from 'ethers6';
Expand Down
8 changes: 3 additions & 5 deletions src/collector/wormhole/wormhole-recovery.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ import {
ParsedVaaWithBytes,
parseVaaWithBytes,
} from '@wormhole-foundation/relayer-engine';
import { decodeWormholeMessage, getDestinationImplementation } from './wormhole.utils';
import { add0X } from 'src/common/utils';
import { decodeWormholeMessage } from './wormhole.utils';
import { add0X, defaultAbiCoder, getDestinationImplementation } from 'src/common/utils';
import { AMBMessage, AMBProof } from 'src/store/store.types';
import { ParsePayload } from 'src/payload/decode.payload';
import {
IncentivizedMessageEscrow,
IncentivizedMessageEscrow__factory,
} from 'src/contracts';
import { WormholeRecoveryWorkerData } from './wormhole.types';
import { AbiCoder, JsonRpcProvider } from 'ethers6';
import { JsonRpcProvider } from 'ethers6';
import { fetchVAAs } from './api-utils';

const defaultAbiCoder = AbiCoder.defaultAbiCoder();

interface RecoveredVAAData {
vaa: ParsedVaaWithBytes,
transactionHash: string,
Expand Down
97 changes: 0 additions & 97 deletions src/collector/wormhole/wormhole.utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import { IncentivizedMessageEscrow } from 'src/contracts';
import { WormholeChainConfig, WormholeChainId } from './wormhole.types';
import { AbiCoder } from 'ethers6';
import pino from 'pino';
import { tryErrorToString, wait } from 'src/common/utils';

export const defaultAbiCoder = AbiCoder.defaultAbiCoder();

export interface DecodedWormholeMessage {
messageIdentifier: string;
Expand Down Expand Up @@ -46,94 +40,3 @@ export function loadWormholeChainIdMap(

return wormholeChainIdMap;
}


export async function getDestinationImplementation(
fromApplication: string,
channelId: string,
escrowContract: IncentivizedMessageEscrow,
cache: Record<string, Record<string, string>> = {}, // Map fromApplication + channelId => destinationImplementation
logger: pino.Logger,
retryInterval: number = 1000,
maxTries: number = 3,
): Promise<string | undefined> {

const cachedImplementation = cache[fromApplication]?.[channelId];

if (cachedImplementation != undefined) {
return cachedImplementation;
}

const destinationImplementation = await queryDestinationImplementation(
fromApplication,
channelId,
escrowContract,
logger,
retryInterval,
maxTries,
);

// Set the destination implementation cache
if (destinationImplementation != undefined) {
if (cache[fromApplication] == undefined) {
cache[fromApplication] = {};
}

cache[fromApplication]![channelId] = destinationImplementation;
}

return destinationImplementation;
}

export async function queryDestinationImplementation(
fromApplication: string,
channelId: string,
escrowContract: IncentivizedMessageEscrow,
logger: pino.Logger,
retryInterval: number = 1000,
maxTries: number = 3,
): Promise<string | undefined> {

for (let tryCount = 0; tryCount < maxTries; tryCount++) {
try {
const destinationImplementation = await escrowContract.implementationAddress(
fromApplication,
channelId,
);

logger.debug(
{
fromApplication,
channelId,
destinationImplementation,
},
`Destination implementation queried.`
);

return '0x' + destinationImplementation.slice(26); // Keep only the last 20-bytes (discard the first '0x' + 12 null bytes)
}
catch (error) {
logger.warn(
{
fromApplication,
channelId,
try: tryCount + 1,
error: tryErrorToString(error),
},
`Error on the destination implementation query. Retrying if possible.`
);
}

await wait(retryInterval);
}

logger.error(
{
fromApplication,
channelId,
},
`Failed to query the destination implementation.`
);

return undefined;
}
99 changes: 97 additions & 2 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ethers } from "ethers6";
import { AbiCoder, ethers } from "ethers6";
import pino from "pino";
import { IncentivizedMessageEscrow } from "src/contracts";

export const defaultAbiCoder = AbiCoder.defaultAbiCoder();

export const wait = (ms: number) => new Promise((res) => setTimeout(res, ms));

Expand Down Expand Up @@ -27,4 +31,95 @@ export const tryErrorToString = (error: any): string | undefined => {
export function paddedTo0xAddress(paddedAddress: string): string {
const normalAddress = '0x' + paddedAddress.slice(-40);
return ethers.getAddress(normalAddress);
};
};


export async function getDestinationImplementation(
fromApplication: string,
channelId: string,
escrowContract: IncentivizedMessageEscrow,
cache: Record<string, Record<string, string>> = {}, // Map fromApplication + channelId => destinationImplementation
logger: pino.Logger,
retryInterval: number = 1000,
maxTries: number = 3,
): Promise<string | undefined> {

const cachedImplementation = cache[fromApplication]?.[channelId];

if (cachedImplementation != undefined) {
return cachedImplementation;
}

const destinationImplementation = await queryDestinationImplementation(
fromApplication,
channelId,
escrowContract,
logger,
retryInterval,
maxTries,
);

// Set the destination implementation cache
if (destinationImplementation != undefined) {
if (cache[fromApplication] == undefined) {
cache[fromApplication] = {};
}

cache[fromApplication]![channelId] = destinationImplementation;
}

return destinationImplementation;
}

export async function queryDestinationImplementation(
fromApplication: string,
channelId: string,
escrowContract: IncentivizedMessageEscrow,
logger: pino.Logger,
retryInterval: number = 1000,
maxTries: number = 3,
): Promise<string | undefined> {

for (let tryCount = 0; tryCount < maxTries; tryCount++) {
try {
const destinationImplementation = await escrowContract.implementationAddress(
fromApplication,
channelId,
);

logger.debug(
{
fromApplication,
channelId,
destinationImplementation,
},
`Destination implementation queried.`
);

return '0x' + destinationImplementation.slice(26); // Keep only the last 20-bytes (discard the first '0x' + 12 null bytes)
}
catch (error) {
logger.warn(
{
fromApplication,
channelId,
try: tryCount + 1,
error: tryErrorToString(error),
},
`Error on the destination implementation query. Retrying if possible.`
);
}

await wait(retryInterval);
}

logger.error(
{
fromApplication,
channelId,
},
`Failed to query the destination implementation.`
);

return undefined;
}

0 comments on commit df046aa

Please sign in to comment.