Skip to content

Commit

Permalink
feat: test unit and fix bug for coin transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-nguyen-20032023 committed Nov 16, 2023
1 parent d07bfba commit 7591a87
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 52 deletions.
4 changes: 4 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ export const SERVICE = {
path: 'v1.CrawlTransactionService.TriggerHandleTxJob',
},
},
CoinTransfer: {
key: 'CoinTransferService',
name: 'v1.CoinTransferService',
},
CrawlGenesisService: {
key: 'CrawlGenesisService',
name: 'v1.CrawlGenesisService',
Expand Down
8 changes: 8 additions & 0 deletions src/models/block_checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ export class BlockCheckpoint extends BaseModel {
};
}

/**
* @description Get or create a check point for job and step run (from, to)
* @param jobName Your job name want to run
* @param lastHeightJobNames Another one or more job that your job depending on. So if your job want to process
* block A, it needs to wait util those jobs process success block A before your job
* @param configName property of config (import config from '../../../config.json' assert { type: 'json' };).
* it used to set step call via blocksPerCall in config
*/
static async getCheckpoint(
jobName: string,
lastHeightJobNames: string[],
Expand Down
66 changes: 14 additions & 52 deletions src/services/crawl-tx/coin_transfer.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable import/no-extraneous-dependencies */
import { ServiceBroker } from 'moleculer';
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { BULL_JOB_NAME, SERVICE } from '../../common';
Expand All @@ -13,42 +12,14 @@ import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';

@Service({
name: SERVICE.V1.CrawlTransaction.key,
name: SERVICE.V1.CoinTransfer.key,
version: 1,
})
export default class CrawlTxService extends BullableService {
export default class CoinTransferService extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

/**
* @description Get latest coin transfer to get latest height, otherwise get height from the oldest transaction crawled
* @private
*/
private async getLatestCoinTransferHeight(): Promise<number> {
const blockCheckpointCT = await BlockCheckpoint.query()
.where('job_name', BULL_JOB_NAME.HANDLE_COIN_TRANSFER)
.first();

if (!blockCheckpointCT) {
const oldestTransaction = await Transaction.query()
.orderBy('height', 'ASC')
.first();
const latestBlockHeight = oldestTransaction
? oldestTransaction.height
: 0;
await BlockCheckpoint.query()
.insert({
height: latestBlockHeight,
job_name: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
})
.onConflict('job_name')
.merge();
return latestBlockHeight;
}
return blockCheckpointCT.height;
}

/**
* @description Get transaction data for insert coin transfer
* @param fromHeight
Expand All @@ -62,7 +33,7 @@ export default class CrawlTxService extends BullableService {
return Transaction.query()
.withGraphFetched('events.[attributes]')
.modifyGraph('events', (builder) => {
builder.andWhere('type', '=', 'transfer').whereNotNull('tx_msg_index');
builder.whereNotNull('tx_msg_index');
})
.withGraphFetched('messages')
.where('transaction.height', '>', fromHeight)
Expand All @@ -86,25 +57,19 @@ export default class CrawlTxService extends BullableService {
queueName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
jobName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
})
public async jobHandlerCrawlTx() {
const transactionCheckPoint = await BlockCheckpoint.query()
.where('job_name', BULL_JOB_NAME.HANDLE_TRANSACTION)
.first();
let latestCoinTransferHeight = await this.getLatestCoinTransferHeight();

if (
!transactionCheckPoint ||
latestCoinTransferHeight >= transactionCheckPoint.height
) {
public async jobHandleTxCoinTransfer() {
const [fromBlock, toBlock, updateBlockCheckpoint] =
await BlockCheckpoint.getCheckpoint(
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
[BULL_JOB_NAME.HANDLE_TRANSACTION],
'handleCoinTransfer'
);

if (fromBlock >= toBlock) {
this.logger.info('Waiting for new transaction crawled');
return;
}

const fromBlock = latestCoinTransferHeight;
const toBlock = Math.min(
fromBlock + config.handleCoinTransfer.blocksPerCall,
transactionCheckPoint.height
);
this.logger.info(`QUERY FROM ${fromBlock} - TO ${toBlock}................`);

const coinTransfers: CoinTransfer[] = [];
Expand Down Expand Up @@ -201,14 +166,11 @@ export default class CrawlTxService extends BullableService {
});
});

latestCoinTransferHeight = toBlock;
updateBlockCheckpoint.height = toBlock;
await knex.transaction(async (trx) => {
await BlockCheckpoint.query()
.transacting(trx)
.insert({
height: latestCoinTransferHeight,
job_name: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
})
.insert(updateBlockCheckpoint)
.onConflict('job_name')
.merge();

Expand Down

0 comments on commit 7591a87

Please sign in to comment.