diff --git a/migrations/20230830024631_cw721_activity_add_current_owner.ts b/migrations/20230830024631_cw721_activity_add_current_owner.ts new file mode 100644 index 000000000..3303354c6 --- /dev/null +++ b/migrations/20230830024631_cw721_activity_add_current_owner.ts @@ -0,0 +1,13 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable('cw721_activity', (table) => { + table.string('owner'); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable('cw721_activity', (table) => { + table.dropColumn('owner'); + }); +} diff --git a/src/common/constant.ts b/src/common/constant.ts index 465d9b5ad..35840e5ee 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -75,6 +75,8 @@ export const BULL_JOB_NAME = { CRAWL_GENESIS_IBC_TAO: 'crawl:genesis-ibc-tao', REINDEX_CW20_CONTRACT: 'reindex:cw20-contract', REINDEX_CW20_HISTORY: 'reindex:cw20-history', + UPDATE_CW721_ACTIVITY_OWNER: 'update:cw721-activity-owner', + UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN: 'update:cw721-activity-owner-by-token', }; export const SERVICE = { @@ -264,6 +266,10 @@ export const SERVICE = { path: 'v1.Cw20ReindexingService.reindexing', }, }, + Cw721ActivityUpdateOwnerService: { + key: 'Cw721ActivityUpdateOwnerService', + name: 'v1.Cw721ActivityUpdateOwnerService', + }, }, }; diff --git a/src/models/cw721_tx.ts b/src/models/cw721_tx.ts index 241949888..c1d0f8a35 100644 --- a/src/models/cw721_tx.ts +++ b/src/models/cw721_tx.ts @@ -21,18 +21,20 @@ export default class CW721Activity extends BaseModel { cw721_contract_id!: number; - cw721_token_id?: number; + cw721_token_id!: number; created_at?: Date; updated_at?: Date; - from?: string; + from!: string; - to?: string; + to!: string; height!: number; + owner!: string; + smart_contract_event_id!: number; static get tableName() { diff --git a/src/services/cw721/cw721-activity-update-owner.service.ts b/src/services/cw721/cw721-activity-update-owner.service.ts new file mode 100644 index 000000000..24a2711f3 --- /dev/null +++ b/src/services/cw721/cw721-activity-update-owner.service.ts @@ -0,0 +1,148 @@ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import _ from 'lodash'; +import { ServiceBroker } from 'moleculer'; +import config from '../../../config.json' assert { type: 'json' }; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { Config } from '../../common'; +import { BULL_JOB_NAME, SERVICE } from '../../common/constant'; +import CW721Activity from '../../models/cw721_tx'; +import { CW721_ACTION } from './cw721.service'; + +const { NODE_ENV } = Config; + +@Service({ + name: SERVICE.V1.Cw721ActivityUpdateOwnerService.key, + version: 1, +}) +export default class Cw721ActivityUpdateOwnerService extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN, + jobName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN, + }) + async updateCw721ActOwnerByToken(_payload: { + cw721ContractId: number; + cw721TokenId: number; + }): Promise { + const { cw721ContractId, cw721TokenId } = _payload; + const activities = await CW721Activity.query() + .whereIn('action', [ + CW721_ACTION.TRANSFER, + CW721_ACTION.SEND_NFT, + CW721_ACTION.BURN, + ]) + .whereNull('owner') + .andWhere('cw721_token_id', cw721TokenId) + .andWhere('cw721_contract_id', cw721ContractId) + .orderBy('id') + .limit(100); + const lastOwnerActivity = await CW721Activity.query() + .whereIn('action', [CW721_ACTION.MINT, CW721_ACTION.TRANSFER]) + .andWhere('cw721_token_id', cw721TokenId) + .andWhere('cw721_contract_id', cw721ContractId) + .whereNotNull('owner') + .orderBy('height', 'DESC') + .first() + .throwIfNotFound(); + const sortedActivities = _.sortBy( + [...activities, lastOwnerActivity], + (e) => e.height + ); + for (let index = 1; index < sortedActivities.length; index += 1) { + sortedActivities[index].owner = sortedActivities[index - 1].to; + } + await CW721Activity.query() + .insert(sortedActivities) + .onConflict('id') + .merge(); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER, + jobName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER, + }) + async jobHandler(): Promise { + const unprocessedActivities = await CW721Activity.query() + .whereIn('action', [ + CW721_ACTION.MINT, + CW721_ACTION.TRANSFER, + CW721_ACTION.SEND_NFT, + CW721_ACTION.BURN, + ]) + .whereNull('owner') + .orderBy('id') + .limit(100); + await this.updateOwnerForMintActs( + unprocessedActivities.filter((e) => e.action === CW721_ACTION.MINT) + ); + const tokens = unprocessedActivities.reduce( + (acc: { cw721ContractId: number; cw721TokenId: number }[], curr) => { + if ( + acc.find( + (e) => + e.cw721ContractId === curr.cw721_contract_id && + e.cw721TokenId === curr.cw721_token_id + ) === undefined + ) { + acc.push({ + cw721ContractId: curr.cw721_contract_id, + cw721TokenId: curr.cw721_token_id, + }); + } + return acc; + }, + [] + ); + await Promise.all( + tokens.map(async (token) => + this.createJob( + BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN, + BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN, + token, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + } + ) + ) + ); + } + + async updateOwnerForMintActs(activities: CW721Activity[]) { + const patchQueries = activities.map((act) => + CW721Activity.query() + .patch({ + owner: act.to, + }) + .where('id', act.id) + ); + if (patchQueries.length > 0) { + await Promise.all(patchQueries); + } + } + + async _start(): Promise { + if (NODE_ENV !== 'test') { + await this.createJob( + BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER, + BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.cw721.millisecondRepeatJob, + }, + } + ); + } + return super._start(); + } +}