Skip to content

Commit

Permalink
Coin transfer refactor (#484)
Browse files Browse the repository at this point in the history
* feat: crawl coin transfer

* feat: crawl coin transfer

* feat: add config for job coin transfer

* fix: revert crawl tx to old logic

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* fix: refactor query coin transfer

* fix: refactor query coin transfer
  • Loading branch information
matthew-nguyen-20032023 authored Nov 17, 2023
1 parent 4ce357a commit 9b6ee74
Show file tree
Hide file tree
Showing 9 changed files with 1,903 additions and 114 deletions.
7 changes: 6 additions & 1 deletion ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
"millisecondCrawl": 5000,
"numberOfBlockPerCall": 100,
"startBlock": 4860000
},
},
"handleCoinTransfer": {
"key": "handleCoinTransfer",
"blocksPerCall": 100,
"millisecondCrawl": 3000
},
"handleTransaction": {
"key": "handleTransaction",
"blocksPerCall": 100,
Expand Down
5 changes: 5 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"numberOfBlockPerCall": 100,
"startBlock": 4860000
},
"handleCoinTransfer": {
"key": "handleCoinTransfer",
"blocksPerCall": 100,
"millisecondCrawl": 3000
},
"handleTransaction": {
"key": "handleTransaction",
"blocksPerCall": 100,
Expand Down
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const BULL_JOB_NAME = {
HANDLE_STAKE_EVENT: 'handle:stake-event',
CRAWL_BLOCK: 'crawl:block',
HANDLE_TRANSACTION: 'handle:transaction',
HANDLE_COIN_TRANSFER: 'handle:coin_transfer',
HANDLE_CW721_TRANSACTION: 'handle:cw721-tx',
REFRESH_CW721_STATS: 'refresh:cw721-stats',
CRAWL_PROPOSAL: 'crawl:proposal',
Expand Down Expand Up @@ -156,6 +157,10 @@ export const SERVICE = {
path: 'v1.CrawlTransactionService.TriggerHandleTxJob',
},
},
CoinTransfer: {
key: 'CoinTransferService',
name: 'v1.CoinTransferService',
},
CrawlGenesisService: {
key: 'CrawlGenesisService',
name: 'v1.CrawlGenesisService',
Expand Down
8 changes: 8 additions & 0 deletions src/models/block_checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ export class BlockCheckpoint extends BaseModel {
};
}

/**
* @description Get or create a check point for job and step run (from, to)
* @param jobName Your job name want to run
* @param lastHeightJobNames Another one or more job that your job depending on. So if your job want to process
* block A, it needs to wait util those jobs process success block A before your job
* @param configName property of config (import config from '../../../config.json' assert { type: 'json' };).
* it used to set step call via blocksPerCall in config
*/
static async getCheckpoint(
jobName: string,
lastHeightJobNames: string[],
Expand Down
220 changes: 220 additions & 0 deletions src/services/crawl-tx/coin_transfer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import { ServiceBroker } from 'moleculer';
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import {
BlockCheckpoint,
CoinTransfer,
Event,
Transaction,
} from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';

@Service({
name: SERVICE.V1.CoinTransfer.key,
version: 1,
})
export default class CoinTransferService extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

/**
* @description Get transaction data for insert coin transfer
* @param fromHeight
* @param toHeight
* @private
*/
private async fetchTransactionCTByHeight(
fromHeight: number,
toHeight: number
): Promise<Transaction[]> {
const transactions = await Transaction.query()
.withGraphFetched('messages')
.where('height', '>', fromHeight)
.andWhere('height', '<=', toHeight)
.orderBy('id', 'ASC');
if (transactions.length === 0) return [];

const transactionsWithId: any = [];
transactions.forEach((transaction) => {
transactionsWithId[transaction.id] = {
...transaction,
events: [],
};
});

const minTransactionId = transactions[0].id;
const maxTransactionId = transactions[transactions.length - 1].id;
const events = await Event.query()
.withGraphFetched('attributes')
.where('tx_id', '>=', minTransactionId)
.andWhere('tx_id', '<=', maxTransactionId)
.whereNotNull('tx_msg_index');
events.forEach((event) => {
transactionsWithId[event.tx_id].events.push(event);
});

return transactionsWithId;
}

/**
* split amount to amount and denom using regex
* example: 10000uaura
* amount = 10000
* denom = uaura
* return [0, ''] if invalid
*/
private extractAmount(rawAmount: string | undefined): [number, string] {
const amount = rawAmount?.match(/(\d+)/)?.[0] ?? '0';
const denom = rawAmount?.replace(amount, '') ?? '';
return [Number.parseInt(amount, 10), denom];
}

@QueueHandler({
queueName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
jobName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
})
public async jobHandleTxCoinTransfer() {
const [fromBlock, toBlock, updateBlockCheckpoint] =
await BlockCheckpoint.getCheckpoint(
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
[BULL_JOB_NAME.HANDLE_TRANSACTION],
'handleCoinTransfer'
);

if (fromBlock >= toBlock) {
this.logger.info('Waiting for new transaction crawled');
return;
}

this.logger.info(`QUERY FROM ${fromBlock} - TO ${toBlock}................`);

const coinTransfers: CoinTransfer[] = [];
const transactions = await this.fetchTransactionCTByHeight(
fromBlock,
toBlock
);

transactions.forEach((tx: Transaction) => {
tx.events.forEach((event: Event) => {
if (!event.tx_msg_index) return;
// skip if message is not 'MsgMultiSend'
if (
event.attributes.length !== 3 &&
tx.messages[event.tx_msg_index].type !==
'/cosmos.bank.v1beta1.MsgMultiSend'
) {
this.logger.error(
'Coin transfer detected in unsupported message type',
tx.hash,
tx.messages[event.tx_msg_index].content
);
return;
}

const ctTemplate = {
block_height: tx.height,
tx_id: tx.id,
tx_msg_id: tx.messages[event.tx_msg_index].id,
from: event.attributes.find((attr) => attr.key === 'sender')?.value,
to: '',
amount: 0,
denom: '',
timestamp: new Date(tx.timestamp).toISOString(),
};
/**
* we expect 2 cases:
* 1. transfer event has only 1 sender and 1 recipient
* then the event will have 3 attributes: sender, recipient, amount
* 2. transfer event has 1 sender and multiple recipients, message must be 'MsgMultiSend'
* then the event will be an array of attributes: recipient1, amount1, recipient2, amount2, ...
* sender is the coin_spent.spender
*/
if (event.attributes.length === 3) {
const rawAmount = event.attributes.find(
(attr) => attr.key === 'amount'
)?.value;
const [amount, denom] = this.extractAmount(rawAmount);
coinTransfers.push(
CoinTransfer.fromJson({
...ctTemplate,
from: event.attributes.find((attr) => attr.key === 'sender')
?.value,
to: event.attributes.find((attr) => attr.key === 'recipient')
?.value,
amount,
denom,
})
);
return;
}

const coinSpentEvent = tx.events.find(
(e: Event) =>
e.type === 'coin_spent' && e.tx_msg_index === event.tx_msg_index
);
ctTemplate.from = coinSpentEvent?.attributes.find(
(attr: { key: string; value: string }) => attr.key === 'spender'
)?.value;
for (let i = 0; i < event.attributes.length; i += 2) {
if (
event.attributes[i].key !== 'recipient' &&
event.attributes[i + 1].key !== 'amount'
) {
this.logger.error(
'Coin transfer in MsgMultiSend detected with invalid attributes',
tx.hash,
event.attributes
);
return;
}

const rawAmount = event.attributes[i + 1].value;
const [amount, denom] = this.extractAmount(rawAmount);
coinTransfers.push(
CoinTransfer.fromJson({
...ctTemplate,
to: event.attributes[i].value,
amount,
denom,
})
);
}
});
});

updateBlockCheckpoint.height = toBlock;
await knex.transaction(async (trx) => {
await BlockCheckpoint.query()
.transacting(trx)
.insert(updateBlockCheckpoint)
.onConflict('job_name')
.merge();

if (coinTransfers.length > 0) {
this.logger.info(`INSERTING ${coinTransfers.length} COIN TRANSFER`);
await CoinTransfer.query().transacting(trx).insert(coinTransfers);
}
});
}

public async _start() {
this.createJob(
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.handleCoinTransfer.millisecondCrawl,
},
}
);
return super._start();
}
}
Loading

0 comments on commit 9b6ee74

Please sign in to comment.