diff --git a/src/common/constant.ts b/src/common/constant.ts index eb8f284a4..c167b2937 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -157,6 +157,10 @@ export const SERVICE = { path: 'v1.CrawlTransactionService.TriggerHandleTxJob', }, }, + CoinTransfer: { + key: 'CoinTransferService', + name: 'v1.CoinTransferService', + }, CrawlGenesisService: { key: 'CrawlGenesisService', name: 'v1.CrawlGenesisService', diff --git a/src/models/block_checkpoint.ts b/src/models/block_checkpoint.ts index 852198c38..95027fc41 100644 --- a/src/models/block_checkpoint.ts +++ b/src/models/block_checkpoint.ts @@ -21,6 +21,14 @@ export class BlockCheckpoint extends BaseModel { }; } + /** + * @description Get or create a check point for job and step run (from, to) + * @param jobName Your job name want to run + * @param lastHeightJobNames Another one or more job that your job depending on. So if your job want to process + * block A, it needs to wait util those jobs process success block A before your job + * @param configName property of config (import config from '../../../config.json' assert { type: 'json' };). + * it used to set step call via blocksPerCall in config + */ static async getCheckpoint( jobName: string, lastHeightJobNames: string[], diff --git a/src/services/crawl-tx/coin_transfer.service.ts b/src/services/crawl-tx/coin_transfer.service.ts index 5efb090ba..1bf1ab7ec 100644 --- a/src/services/crawl-tx/coin_transfer.service.ts +++ b/src/services/crawl-tx/coin_transfer.service.ts @@ -1,4 +1,3 @@ -/* eslint-disable import/no-extraneous-dependencies */ import { ServiceBroker } from 'moleculer'; import { Service } from '@ourparentcenter/moleculer-decorators-extended'; import { BULL_JOB_NAME, SERVICE } from '../../common'; @@ -13,42 +12,14 @@ import config from '../../../config.json' assert { type: 'json' }; import knex from '../../common/utils/db_connection'; @Service({ - name: SERVICE.V1.CrawlTransaction.key, + name: SERVICE.V1.CoinTransfer.key, version: 1, }) -export default class CrawlTxService extends BullableService { +export default class CoinTransferService extends BullableService { public constructor(public broker: ServiceBroker) { super(broker); } - /** - * @description Get latest coin transfer to get latest height, otherwise get height from the oldest transaction crawled - * @private - */ - private async getLatestCoinTransferHeight(): Promise { - const blockCheckpointCT = await BlockCheckpoint.query() - .where('job_name', BULL_JOB_NAME.HANDLE_COIN_TRANSFER) - .first(); - - if (!blockCheckpointCT) { - const oldestTransaction = await Transaction.query() - .orderBy('height', 'ASC') - .first(); - const latestBlockHeight = oldestTransaction - ? oldestTransaction.height - : 0; - await BlockCheckpoint.query() - .insert({ - height: latestBlockHeight, - job_name: BULL_JOB_NAME.HANDLE_COIN_TRANSFER, - }) - .onConflict('job_name') - .merge(); - return latestBlockHeight; - } - return blockCheckpointCT.height; - } - /** * @description Get transaction data for insert coin transfer * @param fromHeight @@ -62,7 +33,7 @@ export default class CrawlTxService extends BullableService { return Transaction.query() .withGraphFetched('events.[attributes]') .modifyGraph('events', (builder) => { - builder.andWhere('type', '=', 'transfer').whereNotNull('tx_msg_index'); + builder.whereNotNull('tx_msg_index'); }) .withGraphFetched('messages') .where('transaction.height', '>', fromHeight) @@ -86,25 +57,19 @@ export default class CrawlTxService extends BullableService { queueName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER, jobName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER, }) - public async jobHandlerCrawlTx() { - const transactionCheckPoint = await BlockCheckpoint.query() - .where('job_name', BULL_JOB_NAME.HANDLE_TRANSACTION) - .first(); - let latestCoinTransferHeight = await this.getLatestCoinTransferHeight(); - - if ( - !transactionCheckPoint || - latestCoinTransferHeight >= transactionCheckPoint.height - ) { + public async jobHandleTxCoinTransfer() { + const [fromBlock, toBlock, updateBlockCheckpoint] = + await BlockCheckpoint.getCheckpoint( + BULL_JOB_NAME.HANDLE_COIN_TRANSFER, + [BULL_JOB_NAME.HANDLE_TRANSACTION], + 'handleCoinTransfer' + ); + + if (fromBlock >= toBlock) { this.logger.info('Waiting for new transaction crawled'); return; } - const fromBlock = latestCoinTransferHeight; - const toBlock = Math.min( - fromBlock + config.handleCoinTransfer.blocksPerCall, - transactionCheckPoint.height - ); this.logger.info(`QUERY FROM ${fromBlock} - TO ${toBlock}................`); const coinTransfers: CoinTransfer[] = []; @@ -201,14 +166,11 @@ export default class CrawlTxService extends BullableService { }); }); - latestCoinTransferHeight = toBlock; + updateBlockCheckpoint.height = toBlock; await knex.transaction(async (trx) => { await BlockCheckpoint.query() .transacting(trx) - .insert({ - height: latestCoinTransferHeight, - job_name: BULL_JOB_NAME.HANDLE_COIN_TRANSFER, - }) + .insert(updateBlockCheckpoint) .onConflict('job_name') .merge();