Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Blockchain Watcher] (APTOS) Integrate aptos source and redeemed events #1174

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4c2b486
First version
Mar 2, 2024
5d100be
Improve index
Mar 4, 2024
0bd1d87
Improve repository and mapper
Mar 4, 2024
7fa0d14
Resolve issues and add new pod for target events
Mar 4, 2024
23d5c81
Resolve test and improve code
Mar 4, 2024
0a28bc3
Add test
Mar 5, 2024
8c7f2ce
Resolve test
Mar 5, 2024
dd18b5d
Improve source event implementation
Mar 6, 2024
9367db1
Improve implementations and create test
Mar 8, 2024
288a199
Remove hardcode params
Mar 8, 2024
9c92e1a
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 8, 2024
8664c2e
Change completed status
Mar 8, 2024
9079fd4
Merge to main
Mar 8, 2024
bf65ae8
Improve code style
Mar 8, 2024
6b1317c
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 8, 2024
c682888
Change current cursor value
Mar 11, 2024
eadc1fe
Improve comments
Mar 11, 2024
aa4cbac
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 11, 2024
87e33fd
improve domain event
Mar 11, 2024
78bf972
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 11, 2024
271ee10
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 12, 2024
f884046
Add validation about differents blocks number
Mar 12, 2024
6612154
Improve transaction and sequence implementation
Mar 12, 2024
8a04db7
Set batches
Mar 12, 2024
bc0b87c
Resolve comment in PR
Mar 13, 2024
3e5fafc
Improve code
Mar 13, 2024
879e150
Resolve comment in PR
Mar 13, 2024
83d3fab
Integrate rpc poll for aptos
Mar 13, 2024
bf4aedb
Improve code
Mar 13, 2024
0898cf0
Merge remote-tracking branch 'origin' into feature-895/integrate-apto…
Mar 13, 2024
44d4417
Remove instrumented aptos client
Mar 13, 2024
4e212e2
Improve test
Mar 13, 2024
c9ff0d5
Improve errors message
Mar 13, 2024
31e0fce
Improve transaction domain and test
Mar 14, 2024
f9a0f53
Remove block name in variables
Mar 14, 2024
5fc18df
Resolved comment in PR
Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion blockchain-watcher/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"port": 9090,
"logLevel": "debug",
"dryRun": true,
"enabledPlatforms": ["solana", "evm", "sui"],
"enabledPlatforms": ["solana", "evm", "sui", "aptos"],
"sns": {
"topicArn": "arn:aws:sns:us-east-1:000000000000:localstack-topic.fifo",
"region": "us-east-1",
Expand Down Expand Up @@ -131,6 +131,13 @@
"rpcs": ["https://fullnode.testnet.sui.io:443"],
"timeout": 10000
},
"aptos": {
"name": "aptos",
"network": "testnet",
"chainId": 22,
"rpcs": ["https://fullnode.devnet.aptoslabs.com/v1"],
"timeout": 10000
},
"arbitrum": {
"name": "arbitrum",
"network": "goerli",
Expand Down
4 changes: 4 additions & 0 deletions blockchain-watcher/config/mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@
"sui": {
"network": "mainnet",
"rpcs": ["https://fullnode.mainnet.sui.io:443", "http://m-sui-01.nodes.stable.io:8546"]
},
"aptos": {
"network": "mainnet",
"rpcs": ["https://fullnode.mainnet.aptoslabs.com"]
}
}
}
119 changes: 119 additions & 0 deletions blockchain-watcher/src/domain/actions/aptos/GetAptosSequences.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Block, Range, TransactionFilter } from "./PollAptos";
import { AptosTransaction } from "../../entities/aptos";
import { AptosRepository } from "../../repositories";
import { createBatches } from "../../../infrastructure/repositories/common/utils";
import winston from "winston";

export class GetAptosSequences {
protected readonly logger: winston.Logger;
private readonly repo: AptosRepository;

private previousBlock?: bigint;
private lastBlock?: bigint;

constructor(repo: AptosRepository) {
this.logger = winston.child({ module: "GetAptosSequences" });
this.repo = repo;
}

async execute(range: Block | undefined, opts: GetAptosOpts): Promise<AptosTransaction[]> {
let populatedTransactions: AptosTransaction[] = [];

this.logger.info(
`[aptos][exec] Processing blocks [previousBlock: ${opts.previousBlock} - lastBlock: ${opts.lastBlock}]`
);

const batches = createBatches(range);

for (const toBatch of batches) {
const fromBatch = this.lastBlock ? Number(this.lastBlock) : range?.fromBlock;

const events = await this.repo.getSequenceNumber(
{
fromBlock: fromBatch,
toBlock: toBatch,
},
opts.filter
);

// Update lastBlock with the new lastBlock
this.lastBlock = BigInt(events[events.length - 1].sequence_number);

if (opts.previousBlock == this.lastBlock) {
return [];
}

// Update previousBlock with opts lastBlock
this.previousBlock = opts.lastBlock;

const transactions = await this.repo.getTransactionsByVersionForSourceEvent(
events,
opts.filter
);

transactions.forEach((tx) => {
populatedTransactions.push(tx);
});
}

this.logger.info(
`[aptos][exec] Got ${populatedTransactions?.length} transactions to process for [addresses:${opts.addresses}][block: ${range?.fromBlock}]`
);
return populatedTransactions;
}

getBlockRange(
cfgBlockBarchSize: number,
cfgFromBlock: bigint | undefined,
savedPreviousSequence: bigint | undefined,
savedLastBlock: bigint | undefined
): Block | undefined {
// If [set up a from block for cfg], return the fromBlock and toBlock equal the block batch size
if (cfgFromBlock) {
return {
fromBlock: Number(cfgFromBlock),
toBlock: cfgBlockBarchSize,
};
}

if (savedPreviousSequence && savedLastBlock) {
// If process the [same block], return the same lastBlock and toBlock equal the block batch size
if (savedPreviousSequence === savedLastBlock) {
return {
fromBlock: Number(savedLastBlock),
toBlock: cfgBlockBarchSize,
};
} else {
// If process [different sequences], return the difference between the lastBlock and the previousBlock plus 1
return {
fromBlock: Number(savedLastBlock),
toBlock: Number(savedLastBlock) - Number(savedPreviousSequence) + 1,
};
}
}

if (savedLastBlock) {
// If there is [no previous block], return the lastBlock and toBlock equal the block batch size
if (!cfgFromBlock || BigInt(cfgFromBlock) < savedLastBlock) {
return {
fromBlock: Number(savedLastBlock),
toBlock: cfgBlockBarchSize,
};
}
}
}

getUpdatedRange(): Range {
return {
previousBlock: this.previousBlock,
lastBlock: this.lastBlock,
};
}
}

export type GetAptosOpts = {
addresses: string[];
filter: TransactionFilter;
previousBlock?: bigint | undefined;
lastBlock?: bigint | undefined;
};
114 changes: 114 additions & 0 deletions blockchain-watcher/src/domain/actions/aptos/GetAptosTransactions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { Block, Range, TransactionFilter } from "./PollAptos";
import { AptosTransaction } from "../../entities/aptos";
import { AptosRepository } from "../../repositories";
import { createBatches } from "../../../infrastructure/repositories/common/utils";
import winston from "winston";

export class GetAptosTransactions {
protected readonly logger: winston.Logger;
private readonly repo: AptosRepository;

private previousBlock?: bigint;
private lastBlock?: bigint;

constructor(repo: AptosRepository) {
this.logger = winston.child({ module: "GetAptosTransactions" });
this.repo = repo;
}

async execute(range: Block | undefined, opts: GetAptosOpts): Promise<AptosTransaction[]> {
let populatedTransactions: AptosTransaction[] = [];

this.logger.info(
`[aptos][exec] Processing blocks [previousBlock: ${opts.previousBlock} - latestBlock: ${opts.lastBlock}]`
);

const batches = createBatches(range);
M-Picco marked this conversation as resolved.
Show resolved Hide resolved

for (const toBatch of batches) {
M-Picco marked this conversation as resolved.
Show resolved Hide resolved
const fromBatch = this.lastBlock ? Number(this.lastBlock) : range?.fromBlock;

const transaction = await this.repo.getTransactions({
fromBlock: fromBatch,
toBlock: toBatch,
});

// Only process transactions to the contract address configured
const transactionsByAddressConfigured = transaction.filter((transaction) =>
opts.filter?.type?.includes(String(transaction.payload?.function).toLowerCase())
);

// Update lastBlock with the new lastBlock
this.lastBlock = BigInt(transaction[transaction.length - 1].version);

if (opts.previousBlock == this.lastBlock) {
return [];
}

// Update previousBlock with opts lastBlock
this.previousBlock = opts.lastBlock;

if (transactionsByAddressConfigured.length > 0) {
const transactions = await this.repo.getTransactionsByVersionForRedeemedEvent(
M-Picco marked this conversation as resolved.
Show resolved Hide resolved
transactionsByAddressConfigured,
M-Picco marked this conversation as resolved.
Show resolved Hide resolved
opts.filter
);

transactions.forEach((tx) => {
populatedTransactions.push(tx);
});
}
}

return populatedTransactions;
}

getBlockRange(
cfgBlockBarchSize: number,
cfgFromBlock: bigint | undefined,
savedPreviousBlock: bigint | undefined,
savedLastBlock: bigint | undefined
): Block | undefined {
// If [set up a from block for cfg], return the fromBlock and toBlock equal the block batch size
if (cfgFromBlock) {
return {
fromBlock: Number(cfgFromBlock),
toBlock: cfgBlockBarchSize,
};
}

if (savedPreviousBlock && savedLastBlock) {
// If process [equal or different blocks], return the same lastBlock and toBlock equal the block batch size
if (savedPreviousBlock === savedLastBlock || savedPreviousBlock !== savedLastBlock) {
return {
fromBlock: Number(savedLastBlock),
toBlock: cfgBlockBarchSize,
};
}
}

if (savedLastBlock) {
// If there is [no previous block], return the lastBlock and toBlock equal the block batch size
if (!cfgFromBlock || BigInt(cfgFromBlock) < savedLastBlock) {
return {
fromBlock: Number(savedLastBlock),
toBlock: cfgBlockBarchSize,
};
}
}
}

getUpdatedRange(): Range {
return {
previousBlock: this.previousBlock,
lastBlock: this.lastBlock,
};
}
}

export type GetAptosOpts = {
addresses: string[];
M-Picco marked this conversation as resolved.
Show resolved Hide resolved
filter: TransactionFilter;
previousBlock?: bigint | undefined;
lastBlock?: bigint | undefined;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { TransactionFoundEvent } from "../../entities";
import { AptosTransaction } from "../../entities/aptos";
import { StatRepository } from "../../repositories";

export class HandleAptosTransactions {
constructor(
private readonly cfg: HandleAptosTransactionsOptions,
private readonly mapper: (tx: AptosTransaction) => TransactionFoundEvent,
private readonly target: (parsed: TransactionFoundEvent[]) => Promise<void>,
private readonly statsRepo: StatRepository
) {}

public async handle(txs: AptosTransaction[]): Promise<TransactionFoundEvent[]> {
const items: TransactionFoundEvent[] = [];

for (const tx of txs) {
const txMapped = this.mapper(tx);
this.report(txMapped.attributes.protocol);
items.push(txMapped);
}

await this.target(items);

return items;
}

private report(protocol: string) {
if (!this.cfg.metricName) return;

const labels = this.cfg.metricLabels ?? {
job: this.cfg.id,
chain: "aptos",
commitment: "finalized",
protocol: protocol,
};

this.statsRepo.count(this.cfg.metricName, labels);
}
}

export interface HandleAptosTransactionsOptions {
metricLabels?: { job: string; chain: string; commitment: string };
eventTypes?: string[];
metricName?: string;
id: string;
}
Loading
Loading