Skip to content

Commit

Permalink
refactor(apps/price_pusher): crash on RPC failures (#1730)
Browse files Browse the repository at this point in the history
* refactor(appts/price_pusher): fix warnings

* refactor(apps/price_pusher): crash on rpc issues

* refactor(apps/price_pusher): crash on stale hermes data

* fix: run linter

* chore: bump version

* fix: do not crash on sendtx failure in solana

* fix: address issues raised in review
  • Loading branch information
ali-bahjati authored Jun 26, 2024
1 parent cc114f7 commit 87aea6f
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 66 deletions.
2 changes: 1 addition & 1 deletion apps/price_pusher/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-pusher",
"version": "7.0.0-alpha",
"version": "7.0.0",
"description": "Pyth Price Pusher",
"homepage": "https://pyth.network",
"main": "lib/index.js",
Expand Down
53 changes: 25 additions & 28 deletions apps/price_pusher/src/aptos/aptos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ export class AptosPriceListener extends ChainPriceListener {
}

async getOnChainPriceInfo(priceId: string): Promise<PriceInfo | undefined> {
try {
const client = new AptosClient(this.endpoint);
const client = new AptosClient(this.endpoint);

const res = await client.getAccountResource(
this.pythModule,
`${this.pythModule}::state::LatestPriceInfo`
);
const res = await client.getAccountResource(
this.pythModule,
`${this.pythModule}::state::LatestPriceInfo`
);

try {
// This depends upon the pyth contract storage on Aptos and should not be undefined.
// If undefined, there has been some change and we would need to update accordingly.
const handle = (res.data as any).info.handle;
Expand Down Expand Up @@ -134,29 +134,26 @@ export class AptosPricePusher implements IPricePusher {
return;
}

try {
const account = AptosAccount.fromDerivePath(
APTOS_ACCOUNT_HD_PATH,
this.mnemonic
);
const client = new AptosClient(this.endpoint);

const sequenceNumber = await this.tryGetNextSequenceNumber(
client,
account
);
const rawTx = await client.generateTransaction(
account.address(),
{
function: `${this.pythContractAddress}::pyth::update_price_feeds_with_funder`,
type_arguments: [],
arguments: [priceFeedUpdateData],
},
{
sequence_number: sequenceNumber.toFixed(),
}
);
const account = AptosAccount.fromDerivePath(
APTOS_ACCOUNT_HD_PATH,
this.mnemonic
);
const client = new AptosClient(this.endpoint);

const sequenceNumber = await this.tryGetNextSequenceNumber(client, account);
const rawTx = await client.generateTransaction(
account.address(),
{
function: `${this.pythContractAddress}::pyth::update_price_feeds_with_funder`,
type_arguments: [],
arguments: [priceFeedUpdateData],
},
{
sequence_number: sequenceNumber.toFixed(),
}
);

try {
const signedTx = await client.signTransaction(account, rawTx);
const pendingTx = await client.submitTransaction(signedTx);

Expand Down
3 changes: 2 additions & 1 deletion apps/price_pusher/src/injective/injective.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ export class InjectivePricePusher implements IPricePusher {
updateFeeQueryResponse = JSON.parse(json);
} catch (err) {
this.logger.error(err, "Error fetching update fee");
return;
// Throwing an error because it is likely an RPC issue
throw err;
}

try {
Expand Down
15 changes: 15 additions & 0 deletions apps/price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import {
import { PriceInfo, IPriceListener, PriceItem } from "./interface";
import { Logger } from "pino";

type TimestampInMs = number & { readonly _: unique symbol };

export class PythPriceListener implements IPriceListener {
private connection: PriceServiceConnection;
private priceIds: HexString[];
private priceIdToAlias: Map<HexString, string>;
private latestPriceInfo: Map<HexString, PriceInfo>;
private logger: Logger;
private lastUpdated: TimestampInMs | undefined;

constructor(
connection: PriceServiceConnection,
Expand Down Expand Up @@ -46,6 +49,17 @@ export class PythPriceListener implements IPriceListener {
publishTime: latestAvailablePrice.publishTime,
});
});

// Check health of the price feeds 5 second. If the price feeds are not updating
// for more than 30s, throw an error.
setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
}
}, 5000);
}

private onNewPriceFeed(priceFeed: PriceFeed) {
Expand All @@ -68,6 +82,7 @@ export class PythPriceListener implements IPriceListener {
};

this.latestPriceInfo.set(priceFeed.id, priceInfo);
this.lastUpdated = Date.now() as TimestampInMs;
}

getLatestPriceInfo(priceId: string): PriceInfo | undefined {
Expand Down
19 changes: 19 additions & 0 deletions apps/price_pusher/src/solana/solana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ export class SolanaPriceListener extends ChainPriceListener {
super(config.pollingFrequency, priceItems);
}

// Checking the health of the Solana connection by checking the last block time
// and ensuring it is not older than 30 seconds.
private async checkHealth() {
const slot = await this.pythSolanaReceiver.connection.getSlot();
const blockTime = await this.pythSolanaReceiver.connection.getBlockTime(
slot
);
if (blockTime === null || blockTime < Date.now() / 1000 - 30) {
throw new Error("Solana connection is unhealthy");
}
}

async start() {
// Frequently check the RPC connection to ensure it is healthy
setInterval(this.checkHealth.bind(this), 5000);

await super.start();
}

async getOnChainPriceInfo(priceId: string): Promise<PriceInfo | undefined> {
try {
const priceFeedAccount =
Expand Down
47 changes: 11 additions & 36 deletions apps/price_pusher/src/sui/sui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ export class SuiPricePusher implements IPricePusher {
private readonly provider: SuiClient,
private logger: Logger,
private priceServiceConnection: PriceServiceConnection,
private pythPackageId: string,
private pythStateId: string,
private wormholePackageId: string,
private wormholeStateId: string,
endpoint: string,
keypair: Ed25519Keypair,
private gasBudget: number,
private gasPool: SuiObjectRef[],
private pythClient: SuiPythClient
Expand Down Expand Up @@ -180,14 +174,6 @@ export class SuiPricePusher implements IPricePusher {
}

const provider = new SuiClient({ url: endpoint });
const pythPackageId = await SuiPricePusher.getPackageId(
provider,
pythStateId
);
const wormholePackageId = await SuiPricePusher.getPackageId(
provider,
wormholeStateId
);

const gasPool = await SuiPricePusher.initializeGasPool(
keypair,
Expand All @@ -208,12 +194,6 @@ export class SuiPricePusher implements IPricePusher {
provider,
logger,
priceServiceConnection,
pythPackageId,
pythStateId,
wormholePackageId,
wormholeStateId,
endpoint,
keypair,
gasBudget,
gasPool,
pythClient
Expand Down Expand Up @@ -337,7 +317,7 @@ export class SuiPricePusher implements IPricePusher {
ignoreGasObjects: string[],
logger: Logger
): Promise<SuiObjectRef[]> {
const signerAddress = await signer.toSuiAddress();
const signerAddress = signer.toSuiAddress();

if (ignoreGasObjects.length > 0) {
logger.info(
Expand Down Expand Up @@ -383,25 +363,20 @@ export class SuiPricePusher implements IPricePusher {
}

// Attempt to refresh the version of the provided object reference to point to the current version
// of the object. Return the provided object reference if an error occurs or the object could not
// be retrieved.
// of the object. Throws an error if the object cannot be refreshed.
private static async tryRefreshObjectReference(
provider: SuiClient,
ref: SuiObjectRef
): Promise<SuiObjectRef> {
try {
const objectResponse = await provider.getObject({ id: ref.objectId });
if (objectResponse.data !== undefined) {
return {
digest: objectResponse.data!.digest,
objectId: objectResponse.data!.objectId,
version: objectResponse.data!.version,
};
} else {
return ref;
}
} catch (error) {
return ref;
const objectResponse = await provider.getObject({ id: ref.objectId });
if (objectResponse.data !== undefined) {
return {
digest: objectResponse.data!.digest,
objectId: objectResponse.data!.objectId,
version: objectResponse.data!.version,
};
} else {
throw new Error("Failed to refresh object reference");
}
}

Expand Down

0 comments on commit 87aea6f

Please sign in to comment.