diff --git a/src/mappings/token/utils.ts b/src/mappings/token/utils.ts index a4b7d1447..c75f83e6f 100644 --- a/src/mappings/token/utils.ts +++ b/src/mappings/token/utils.ts @@ -327,8 +327,8 @@ export async function getHolderAccountsForToken( const holders = await em.getRepository(TokenAccount).findBy({ tokenId }) const holdersMemberIds = holders - .filter((follower) => follower?.memberId) - .map((follower) => follower.memberId as string) + .filter((holder) => holder?.memberId) + .map((holder) => holder.memberId as string) const limit = pLimit(10) // Limit to 10 concurrent promises const holdersAccounts: (Account | null)[] = await Promise.all( @@ -347,17 +347,17 @@ export async function notifyTokenHolders( event?: Event, dispatchBlock?: number ) { - const holdersAccounts = await getHolderAccountsForToken(em, tokenId) + const holderAccounts = await getHolderAccountsForToken(em, tokenId) const limit = pLimit(10) // Limit to 10 concurrent promises await Promise.all( - holdersAccounts.map((holdersAccount) => + holderAccounts.map((holderAccount) => limit(() => addNotification( em, - holdersAccount, - new MemberRecipient({ membership: holdersAccount.membershipId }), + holderAccount, + new MemberRecipient({ membership: holderAccount.membershipId }), notificationType, event, dispatchBlock diff --git a/src/tests/integration/notifications.test.ts b/src/tests/integration/notifications.test.ts index a5e4455aa..6ff931146 100644 --- a/src/tests/integration/notifications.test.ts +++ b/src/tests/integration/notifications.test.ts @@ -1,12 +1,14 @@ -import { EntityManager } from 'typeorm' -import { IMemberRemarked, ReactVideo, MemberRemarked } from '@joystream/metadata-protobuf' +import { IMemberRemarked, MemberRemarked, ReactVideo } from '@joystream/metadata-protobuf' import { AnyMetadataClass } from '@joystream/metadata-protobuf/types' -import { defaultTestBlock, populateDbWithSeedData } from './testUtils' -import { globalEm } from '../../utils/globalEm' -import { - excludeChannelService, - verifyChannelService, -} from '../../server-extension/resolvers/ChannelsResolver' +import { Store } from '@subsquid/typeorm-store' +import { expect } from 'chai' +import { config as dontenvConfig } from 'dotenv' +import Long from 'long' +import path from 'path' +import { EntityManager } from 'typeorm' +import { auctionBidMadeInner } from '../../mappings/content/nft' +import { processMemberRemarkedEvent } from '../../mappings/membership' +import { backwardCompatibleMetaID } from '../../mappings/utils' import { Account, Channel, @@ -26,21 +28,19 @@ import { Video, VideoLiked, } from '../../model' -import { expect } from 'chai' +import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver' +import { + excludeChannelService, + verifyChannelService, +} from '../../server-extension/resolvers/ChannelsResolver' +import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver' +import { globalEm } from '../../utils/globalEm' import { OFFCHAIN_NOTIFICATION_ID_TAG, RUNTIME_NOTIFICATION_ID_TAG, } from '../../utils/notification/helpers' -import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver' -import { auctionBidMadeInner } from '../../mappings/content/nft' -import { EntityManagerOverlay } from '../../utils/overlay' -import { Store } from '@subsquid/typeorm-store' -import { processMemberRemarkedEvent } from '../../mappings/membership' -import Long from 'long' -import { backwardCompatibleMetaID } from '../../mappings/utils' -import { config as dontenvConfig } from 'dotenv' -import path from 'path' -import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver' +import { AnyEntity, Constructor, EntityManagerOverlay } from '../../utils/overlay' +import { defaultTestBlock, populateDbWithSeedData } from './testUtils' dontenvConfig({ path: path.resolve(__dirname, './.env'), @@ -50,9 +50,19 @@ const metadataToBytes = (metaClass: AnyMetadataClass, obj: T): Uint8Array return Buffer.from(metaClass.encode(obj).finish()) } -const getNextNotificationId = async (em: EntityManager, onchain: boolean) => { +const getNextNotificationId = async ( + store: EntityManager | EntityManagerOverlay, + onchain: boolean +) => { const tag = onchain ? RUNTIME_NOTIFICATION_ID_TAG : OFFCHAIN_NOTIFICATION_ID_TAG - const row = await em.getRepository(NextEntityId).findOneBy({ entityName: tag }) + if (store instanceof EntityManagerOverlay) { + const row = await store + .getRepository(NextEntityId as Constructor) + .getOneBy({ entityName: tag }) + const id = parseInt(row?.nextId.toString() || '1') + return id + } + const row = await store.getRepository(NextEntityId).findOneBy({ entityName: tag }) const id = parseInt(row?.nextId.toString() || '1') return id } @@ -69,6 +79,7 @@ describe('notifications tests', () => { let em: EntityManager before(async () => { em = await globalEm + overlay = await createOverlay() await populateDbWithSeedData() }) describe('👉 YPP Verify channel', () => { @@ -281,10 +292,9 @@ describe('notifications tests', () => { before(async () => { const bidAmount = BigInt(100000) - nextNotificationIdPre = await getNextNotificationId(em, true) + nextNotificationIdPre = await getNextNotificationId(overlay, true) notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre nft = await em.getRepository(OwnedNft).findOneByOrFail({ videoId }) - overlay = await createOverlay() await auctionBidMadeInner( overlay, @@ -368,8 +378,7 @@ describe('notifications tests', () => { asV2001: ['3', metadataToBytes(MemberRemarked, metadataMessage), undefined], } as any before(async () => { - overlay = await createOverlay() - nextNotificationIdPre = await getNextNotificationId(em, true) + nextNotificationIdPre = await getNextNotificationId(overlay, true) notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString() }) it('should process video liked and deposit notification', async () => { @@ -381,7 +390,7 @@ describe('notifications tests', () => { event, }) - const nextNotificationId = await getNextNotificationId(em, true) + const nextNotificationId = await getNextNotificationId(overlay, true) notification = (await overlay .getRepository(Notification) .getByIdOrFail(notificationId)) as Notification @@ -422,8 +431,7 @@ describe('notifications tests', () => { asV2001: ['2', metadataToBytes(MemberRemarked, metadataMessage), undefined], // avoid comment author == creator } as any before(async () => { - overlay = await createOverlay() - nextNotificationIdPre = await getNextNotificationId(em, true) + nextNotificationIdPre = await getNextNotificationId(overlay, true) notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString() }) it('should process comment to video and deposit notification', async () => { @@ -435,7 +443,7 @@ describe('notifications tests', () => { event, }) - const nextNotificationId = await getNextNotificationId(em, true) + const nextNotificationId = await getNextNotificationId(overlay, true) notification = (await overlay .getRepository(Notification) .getByIdOrFail(notificationId)) as Notification | null @@ -485,7 +493,7 @@ describe('notifications tests', () => { } as any before(async () => { - nextNotificationIdPre = await getNextNotificationId(em, true) + nextNotificationIdPre = await getNextNotificationId(overlay, true) notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString() await processMemberRemarkedEvent({ @@ -500,7 +508,7 @@ describe('notifications tests', () => { describe('should process reply to comment and deposit notification', () => { let nextNotificationId: number before(async () => { - nextNotificationId = await getNextNotificationId(em, true) + nextNotificationId = await getNextNotificationId(overlay, true) notification = (await overlay .getRepository(Notification) .getByIdOrFail(notificationId)) as Notification | null diff --git a/src/utils/nextEntityId.ts b/src/utils/nextEntityId.ts index 86e610233..ff8738068 100644 --- a/src/utils/nextEntityId.ts +++ b/src/utils/nextEntityId.ts @@ -1,15 +1,40 @@ import { EntityManager } from 'typeorm' import { NextEntityId } from '../model' +import { AnyEntity, Constructor, EntityManagerOverlay } from './overlay' -// used to retrieve the next id for an entity -export async function getNextIdForEntity(em: EntityManager, entityName: string): Promise { +// used to retrieve the next id for an entity from NextEntityId table using either EntityManager or Overlay +export async function getNextIdForEntity( + store: EntityManager | EntityManagerOverlay, + entityName: string +): Promise { + // Get next entity id from overlay (this will mostly be used in the mappings context) + if (store instanceof EntityManagerOverlay) { + const row = await store + .getRepository(NextEntityId as Constructor) + .getOneBy({ entityName: entityName }) + + const id = parseInt(row?.nextId.toString() || '1') + + // Update the id to be the next one in the overlay + if (row) { + row.nextId++ + } else { + store + .getRepository(NextEntityId as Constructor) + .new({ entityName, nextId: id + 1 }) + } + + return id + } + + // Get next entity id from EntityManager (this will mostly be used in the graphql-server/auth-api context) let row: NextEntityId | null if (process.env.TESTING === 'true' || process.env.TESTING === '1') { - row = await em.getRepository(NextEntityId).findOne({ + row = await store.getRepository(NextEntityId).findOne({ where: { entityName }, }) } else { - row = await em.getRepository(NextEntityId).findOne({ + row = await store.getRepository(NextEntityId).findOne({ where: { entityName }, lock: { mode: 'pessimistic_write' }, }) diff --git a/src/utils/notification/helpers.ts b/src/utils/notification/helpers.ts index 3b705f539..9c112224d 100644 --- a/src/utils/notification/helpers.ts +++ b/src/utils/notification/helpers.ts @@ -159,7 +159,7 @@ async function addOffChainNotification( const nextNotificationId = await getNextIdForEntity(em, OFFCHAIN_NOTIFICATION_ID_TAG) const notification = createNotification( - OFFCHAIN_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(), + `${OFFCHAIN_NOTIFICATION_ID_TAG}-${nextNotificationId}`, account.id, recipient, notificationType, @@ -186,21 +186,22 @@ async function addRuntimeNotification( event: Event, dispatchBlock?: number ) { - const em = overlay.getEm() // get notification Id from orion_db in any case - const nextNotificationId = await getNextIdForEntity(em, RUNTIME_NOTIFICATION_ID_TAG) + const nextNotificationId = await getNextIdForEntity(overlay, RUNTIME_NOTIFICATION_ID_TAG) + + const runtimeNotificationId = `${RUNTIME_NOTIFICATION_ID_TAG}-${nextNotificationId}` // check that on-notification is not already present in orion_db in case the processor has been restarted (but not orion_db) const existingNotification = await overlay .getRepository(Notification) - .getById(nextNotificationId.toString()) + .getById(runtimeNotificationId) if (existingNotification) { return } const notification = createNotification( - RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(), + runtimeNotificationId, account.id, recipient, notificationType, @@ -216,8 +217,6 @@ async function addRuntimeNotification( await createEmailNotification(overlay, notification) } - await saveNextNotificationId(em, nextNotificationId + 1, RUNTIME_NOTIFICATION_ID_TAG) - return notification.id } diff --git a/src/utils/notification/index.ts b/src/utils/notification/index.ts index 9ae543de6..9a91e01c8 100644 --- a/src/utils/notification/index.ts +++ b/src/utils/notification/index.ts @@ -1,7 +1,5 @@ export { + addNotification, defaultNotificationPreferences, preferencesForNotification, - addNotification, } from './helpers' -// export * from './notificationTexts' -// export * from './notificationLinks' diff --git a/src/utils/overlay.ts b/src/utils/overlay.ts index 197b6bb41..6b57cea1d 100644 --- a/src/utils/overlay.ts +++ b/src/utils/overlay.ts @@ -345,16 +345,16 @@ export class EntityManagerOverlay { constructor( private em: EntityManager, private nextEntityIds: NextEntityId[], - private afterDbUpdte: (em: EntityManager) => Promise + private afterDbUpdate: (em: EntityManager) => Promise ) {} - public static async create(store: Store, afterDbUpdte: (em: EntityManager) => Promise) { + public static async create(store: Store, afterDbUpdate: (em: EntityManager) => Promise) { // FIXME: This is a little hacky, but we really need to access the underlying EntityManager const em = await (store as unknown as { em: () => Promise }).em() // Add "admin" schema to search path in order to be able to access "hidden" entities await em.query('SET search_path TO admin,public') const nextEntityIds = await em.find(NextEntityId, {}) - return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdte) + return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdate) } public totalCacheSize() { @@ -400,6 +400,6 @@ export class EntityManagerOverlay { }) ) await this.em.save(nextIds) - await this.afterDbUpdte(this.em) + await this.afterDbUpdate(this.em) } }