Skip to content

Commit

Permalink
fix: avoiding IDs override/conflict while creating concurrent runtime…
Browse files Browse the repository at this point in the history
… notifications (#342)

* fix: avoiding IDs override while creating conturrent runtime notifications

* create record in 'getNextIdForEntity' function in overlay for entityname, if it does not exist

* fix: notifications unit tests
  • Loading branch information
zeeshanakram3 authored Jul 17, 2024
1 parent 6167139 commit 3373b62
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 55 deletions.
12 changes: 6 additions & 6 deletions src/mappings/token/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ export async function getHolderAccountsForToken(
const holders = await em.getRepository(TokenAccount).findBy({ tokenId })

const holdersMemberIds = holders
.filter((follower) => follower?.memberId)
.map((follower) => follower.memberId as string)
.filter((holder) => holder?.memberId)
.map((holder) => holder.memberId as string)

const limit = pLimit(10) // Limit to 10 concurrent promises
const holdersAccounts: (Account | null)[] = await Promise.all(
Expand All @@ -347,17 +347,17 @@ export async function notifyTokenHolders(
event?: Event,
dispatchBlock?: number
) {
const holdersAccounts = await getHolderAccountsForToken(em, tokenId)
const holderAccounts = await getHolderAccountsForToken(em, tokenId)

const limit = pLimit(10) // Limit to 10 concurrent promises

await Promise.all(
holdersAccounts.map((holdersAccount) =>
holderAccounts.map((holderAccount) =>
limit(() =>
addNotification(
em,
holdersAccount,
new MemberRecipient({ membership: holdersAccount.membershipId }),
holderAccount,
new MemberRecipient({ membership: holderAccount.membershipId }),
notificationType,
event,
dispatchBlock
Expand Down
70 changes: 39 additions & 31 deletions src/tests/integration/notifications.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { EntityManager } from 'typeorm'
import { IMemberRemarked, ReactVideo, MemberRemarked } from '@joystream/metadata-protobuf'
import { IMemberRemarked, MemberRemarked, ReactVideo } from '@joystream/metadata-protobuf'
import { AnyMetadataClass } from '@joystream/metadata-protobuf/types'
import { defaultTestBlock, populateDbWithSeedData } from './testUtils'
import { globalEm } from '../../utils/globalEm'
import {
excludeChannelService,
verifyChannelService,
} from '../../server-extension/resolvers/ChannelsResolver'
import { Store } from '@subsquid/typeorm-store'
import { expect } from 'chai'
import { config as dontenvConfig } from 'dotenv'
import Long from 'long'
import path from 'path'
import { EntityManager } from 'typeorm'
import { auctionBidMadeInner } from '../../mappings/content/nft'
import { processMemberRemarkedEvent } from '../../mappings/membership'
import { backwardCompatibleMetaID } from '../../mappings/utils'
import {
Account,
Channel,
Expand All @@ -26,21 +28,19 @@ import {
Video,
VideoLiked,
} from '../../model'
import { expect } from 'chai'
import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver'
import {
excludeChannelService,
verifyChannelService,
} from '../../server-extension/resolvers/ChannelsResolver'
import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver'
import { globalEm } from '../../utils/globalEm'
import {
OFFCHAIN_NOTIFICATION_ID_TAG,
RUNTIME_NOTIFICATION_ID_TAG,
} from '../../utils/notification/helpers'
import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver'
import { auctionBidMadeInner } from '../../mappings/content/nft'
import { EntityManagerOverlay } from '../../utils/overlay'
import { Store } from '@subsquid/typeorm-store'
import { processMemberRemarkedEvent } from '../../mappings/membership'
import Long from 'long'
import { backwardCompatibleMetaID } from '../../mappings/utils'
import { config as dontenvConfig } from 'dotenv'
import path from 'path'
import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver'
import { AnyEntity, Constructor, EntityManagerOverlay } from '../../utils/overlay'
import { defaultTestBlock, populateDbWithSeedData } from './testUtils'

dontenvConfig({
path: path.resolve(__dirname, './.env'),
Expand All @@ -50,9 +50,19 @@ const metadataToBytes = <T>(metaClass: AnyMetadataClass<T>, obj: T): Uint8Array
return Buffer.from(metaClass.encode(obj).finish())
}

const getNextNotificationId = async (em: EntityManager, onchain: boolean) => {
const getNextNotificationId = async (
store: EntityManager | EntityManagerOverlay,
onchain: boolean
) => {
const tag = onchain ? RUNTIME_NOTIFICATION_ID_TAG : OFFCHAIN_NOTIFICATION_ID_TAG
const row = await em.getRepository(NextEntityId).findOneBy({ entityName: tag })
if (store instanceof EntityManagerOverlay) {
const row = await store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.getOneBy({ entityName: tag })
const id = parseInt(row?.nextId.toString() || '1')
return id
}
const row = await store.getRepository(NextEntityId).findOneBy({ entityName: tag })
const id = parseInt(row?.nextId.toString() || '1')
return id
}
Expand All @@ -69,6 +79,7 @@ describe('notifications tests', () => {
let em: EntityManager
before(async () => {
em = await globalEm
overlay = await createOverlay()
await populateDbWithSeedData()
})
describe('👉 YPP Verify channel', () => {
Expand Down Expand Up @@ -281,10 +292,9 @@ describe('notifications tests', () => {

before(async () => {
const bidAmount = BigInt(100000)
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre
nft = await em.getRepository(OwnedNft).findOneByOrFail({ videoId })
overlay = await createOverlay()

await auctionBidMadeInner(
overlay,
Expand Down Expand Up @@ -368,8 +378,7 @@ describe('notifications tests', () => {
asV2001: ['3', metadataToBytes(MemberRemarked, metadataMessage), undefined],
} as any
before(async () => {
overlay = await createOverlay()
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()
})
it('should process video liked and deposit notification', async () => {
Expand All @@ -381,7 +390,7 @@ describe('notifications tests', () => {
event,
})

const nextNotificationId = await getNextNotificationId(em, true)
const nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification
Expand Down Expand Up @@ -422,8 +431,7 @@ describe('notifications tests', () => {
asV2001: ['2', metadataToBytes(MemberRemarked, metadataMessage), undefined], // avoid comment author == creator
} as any
before(async () => {
overlay = await createOverlay()
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()
})
it('should process comment to video and deposit notification', async () => {
Expand All @@ -435,7 +443,7 @@ describe('notifications tests', () => {
event,
})

const nextNotificationId = await getNextNotificationId(em, true)
const nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification | null
Expand Down Expand Up @@ -485,7 +493,7 @@ describe('notifications tests', () => {
} as any

before(async () => {
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()

await processMemberRemarkedEvent({
Expand All @@ -500,7 +508,7 @@ describe('notifications tests', () => {
describe('should process reply to comment and deposit notification', () => {
let nextNotificationId: number
before(async () => {
nextNotificationId = await getNextNotificationId(em, true)
nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification | null
Expand Down
33 changes: 29 additions & 4 deletions src/utils/nextEntityId.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
import { EntityManager } from 'typeorm'
import { NextEntityId } from '../model'
import { AnyEntity, Constructor, EntityManagerOverlay } from './overlay'

// used to retrieve the next id for an entity
export async function getNextIdForEntity(em: EntityManager, entityName: string): Promise<number> {
// used to retrieve the next id for an entity from NextEntityId table using either EntityManager or Overlay
export async function getNextIdForEntity(
store: EntityManager | EntityManagerOverlay,
entityName: string
): Promise<number> {
// Get next entity id from overlay (this will mostly be used in the mappings context)
if (store instanceof EntityManagerOverlay) {
const row = await store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.getOneBy({ entityName: entityName })

const id = parseInt(row?.nextId.toString() || '1')

// Update the id to be the next one in the overlay
if (row) {
row.nextId++
} else {
store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.new({ entityName, nextId: id + 1 })
}

return id
}

// Get next entity id from EntityManager (this will mostly be used in the graphql-server/auth-api context)
let row: NextEntityId | null
if (process.env.TESTING === 'true' || process.env.TESTING === '1') {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
})
} else {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
lock: { mode: 'pessimistic_write' },
})
Expand Down
13 changes: 6 additions & 7 deletions src/utils/notification/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async function addOffChainNotification(
const nextNotificationId = await getNextIdForEntity(em, OFFCHAIN_NOTIFICATION_ID_TAG)

const notification = createNotification(
OFFCHAIN_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
`${OFFCHAIN_NOTIFICATION_ID_TAG}-${nextNotificationId}`,
account.id,
recipient,
notificationType,
Expand All @@ -186,21 +186,22 @@ async function addRuntimeNotification(
event: Event,
dispatchBlock?: number
) {
const em = overlay.getEm()
// get notification Id from orion_db in any case
const nextNotificationId = await getNextIdForEntity(em, RUNTIME_NOTIFICATION_ID_TAG)
const nextNotificationId = await getNextIdForEntity(overlay, RUNTIME_NOTIFICATION_ID_TAG)

const runtimeNotificationId = `${RUNTIME_NOTIFICATION_ID_TAG}-${nextNotificationId}`

// check that on-notification is not already present in orion_db in case the processor has been restarted (but not orion_db)
const existingNotification = await overlay
.getRepository(Notification)
.getById(nextNotificationId.toString())
.getById(runtimeNotificationId)

if (existingNotification) {
return
}

const notification = createNotification(
RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
runtimeNotificationId,
account.id,
recipient,
notificationType,
Expand All @@ -216,8 +217,6 @@ async function addRuntimeNotification(
await createEmailNotification(overlay, notification)
}

await saveNextNotificationId(em, nextNotificationId + 1, RUNTIME_NOTIFICATION_ID_TAG)

return notification.id
}

Expand Down
4 changes: 1 addition & 3 deletions src/utils/notification/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
export {
addNotification,
defaultNotificationPreferences,
preferencesForNotification,
addNotification,
} from './helpers'
// export * from './notificationTexts'
// export * from './notificationLinks'
8 changes: 4 additions & 4 deletions src/utils/overlay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,16 @@ export class EntityManagerOverlay {
constructor(
private em: EntityManager,
private nextEntityIds: NextEntityId[],
private afterDbUpdte: (em: EntityManager) => Promise<void>
private afterDbUpdate: (em: EntityManager) => Promise<void>
) {}

public static async create(store: Store, afterDbUpdte: (em: EntityManager) => Promise<void>) {
public static async create(store: Store, afterDbUpdate: (em: EntityManager) => Promise<void>) {
// FIXME: This is a little hacky, but we really need to access the underlying EntityManager
const em = await (store as unknown as { em: () => Promise<EntityManager> }).em()
// Add "admin" schema to search path in order to be able to access "hidden" entities
await em.query('SET search_path TO admin,public')
const nextEntityIds = await em.find(NextEntityId, {})
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdte)
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdate)
}

public totalCacheSize() {
Expand Down Expand Up @@ -400,6 +400,6 @@ export class EntityManagerOverlay {
})
)
await this.em.save(nextIds)
await this.afterDbUpdte(this.em)
await this.afterDbUpdate(this.em)
}
}

0 comments on commit 3373b62

Please sign in to comment.