Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/update owner for each cw721 activity ( main) #349

Closed
wants to merge 11 commits into from
13 changes: 13 additions & 0 deletions migrations/20230830024631_cw721_activity_add_current_owner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable('cw721_activity', (table) => {
table.string('owner');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

để nguyên định nghĩa from với to, em thêm sender thì nó hợp lý hơn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from và to hiện tại đang có data, thực chất from đang là data của sender, nếu thêm cột sender thì phải chuyển data từ from qua ( hoặc rename cột ). Em thêm cột owner thì chỉ cần fill data vào mà em thấy cũng tạm chấp nhận được, cái tên cột mình hiểu là đc mà :))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😛

});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable('cw721_activity', (table) => {
table.dropColumn('owner');
});
}
6 changes: 6 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export const BULL_JOB_NAME = {
CRAWL_GENESIS_IBC_TAO: 'crawl:genesis-ibc-tao',
REINDEX_CW20_CONTRACT: 'reindex:cw20-contract',
REINDEX_CW20_HISTORY: 'reindex:cw20-history',
UPDATE_CW721_ACTIVITY_OWNER: 'update:cw721-activity-owner',
UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN: 'update:cw721-activity-owner-by-token',
};

export const SERVICE = {
Expand Down Expand Up @@ -264,6 +266,10 @@ export const SERVICE = {
path: 'v1.Cw20ReindexingService.reindexing',
},
},
Cw721ActivityUpdateOwnerService: {
key: 'Cw721ActivityUpdateOwnerService',
name: 'v1.Cw721ActivityUpdateOwnerService',
},
},
};

Expand Down
8 changes: 5 additions & 3 deletions src/models/cw721_tx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ export default class CW721Activity extends BaseModel {

cw721_contract_id!: number;

cw721_token_id?: number;
cw721_token_id!: number;

created_at?: Date;

updated_at?: Date;

from?: string;
from!: string;

to?: string;
to!: string;

height!: number;

owner!: string;

smart_contract_event_id!: number;

static get tableName() {
Expand Down
148 changes: 148 additions & 0 deletions src/services/cw721/cw721-activity-update-owner.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import _ from 'lodash';
import { ServiceBroker } from 'moleculer';
import config from '../../../config.json' assert { type: 'json' };
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { Config } from '../../common';
import { BULL_JOB_NAME, SERVICE } from '../../common/constant';
import CW721Activity from '../../models/cw721_tx';
import { CW721_ACTION } from './cw721.service';

const { NODE_ENV } = Config;

@Service({
name: SERVICE.V1.Cw721ActivityUpdateOwnerService.key,
version: 1,
})
export default class Cw721ActivityUpdateOwnerService extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

@QueueHandler({
queueName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN,
jobName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN,
})
async updateCw721ActOwnerByToken(_payload: {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hiện tại trên xstaxy cũng chỉ có 25k activity, load hết ra mà sửa trong 1 migration
sau đó thì gán luôn lúc có activity mới

Copy link
Collaborator Author

@phamphong9981 phamphong9981 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trước kia em đã có nghĩ giải quyết theo hướng chạy migration update data cũ và sửa logic code. Nhưng việc sửa logic code sẽ gặp những vấn đề là:

  • code phình to
  • logic tương đối phức tạp
  • khó debug/test
  • có nhiều trường hợp khi một phần data nằm trong range [startBlock, endBlock] khi mình bắt được và một phần data nằm trong DB

Em muốn thống nhất là data khi trong DB rồi thì mình bốc ra xử lý những cái chưa update owner

cw721ContractId: number;
cw721TokenId: number;
}): Promise<void> {
const { cw721ContractId, cw721TokenId } = _payload;
const activities = await CW721Activity.query()
.whereIn('action', [
CW721_ACTION.TRANSFER,
CW721_ACTION.SEND_NFT,
CW721_ACTION.BURN,
])
.whereNull('owner')
.andWhere('cw721_token_id', cw721TokenId)
.andWhere('cw721_contract_id', cw721ContractId)
.orderBy('id')
.limit(100);
const lastOwnerActivity = await CW721Activity.query()
.whereIn('action', [CW721_ACTION.MINT, CW721_ACTION.TRANSFER])
.andWhere('cw721_token_id', cw721TokenId)
.andWhere('cw721_contract_id', cw721ContractId)
.whereNotNull('owner')
.orderBy('height', 'DESC')
.first()
.throwIfNotFound();
const sortedActivities = _.sortBy(
[...activities, lastOwnerActivity],
(e) => e.height
);
for (let index = 1; index < sortedActivities.length; index += 1) {
sortedActivities[index].owner = sortedActivities[index - 1].to;
}
await CW721Activity.query()
.insert(sortedActivities)
.onConflict('id')
.merge();
}

@QueueHandler({
queueName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER,
jobName: BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER,
})
async jobHandler(): Promise<void> {
const unprocessedActivities = await CW721Activity.query()
.whereIn('action', [
CW721_ACTION.MINT,
CW721_ACTION.TRANSFER,
CW721_ACTION.SEND_NFT,
CW721_ACTION.BURN,
])
.whereNull('owner')
.orderBy('id')
.limit(100);
await this.updateOwnerForMintActs(
unprocessedActivities.filter((e) => e.action === CW721_ACTION.MINT)
);
const tokens = unprocessedActivities.reduce(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code lại

(acc: { cw721ContractId: number; cw721TokenId: number }[], curr) => {
if (
acc.find(
(e) =>
e.cw721ContractId === curr.cw721_contract_id &&
e.cw721TokenId === curr.cw721_token_id
) === undefined
) {
acc.push({
cw721ContractId: curr.cw721_contract_id,
cw721TokenId: curr.cw721_token_id,
});
}
return acc;
},
[]
);
await Promise.all(
tokens.map(async (token) =>
this.createJob(
BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN,
BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN,
token,
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
}
)
)
);
}

async updateOwnerForMintActs(activities: CW721Activity[]) {
const patchQueries = activities.map((act) =>
CW721Activity.query()
.patch({
owner: act.to,
})
.where('id', act.id)
);
if (patchQueries.length > 0) {
await Promise.all(patchQueries);
}
}

async _start(): Promise<void> {
if (NODE_ENV !== 'test') {
await this.createJob(
BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER,
BULL_JOB_NAME.UPDATE_CW721_ACTIVITY_OWNER,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.cw721.millisecondRepeatJob,
},
}
);
}
return super._start();
}
}
Loading