Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoiding IDs override/conflict while creating concurrent runtime notifications #342

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/mappings/token/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ export async function getHolderAccountsForToken(
const holders = await em.getRepository(TokenAccount).findBy({ tokenId })

const holdersMemberIds = holders
.filter((follower) => follower?.memberId)
.map((follower) => follower.memberId as string)
.filter((holder) => holder?.memberId)
.map((holder) => holder.memberId as string)

const limit = pLimit(10) // Limit to 10 concurrent promises
const holdersAccounts: (Account | null)[] = await Promise.all(
Expand All @@ -347,17 +347,17 @@ export async function notifyTokenHolders(
event?: Event,
dispatchBlock?: number
) {
const holdersAccounts = await getHolderAccountsForToken(em, tokenId)
const holderAccounts = await getHolderAccountsForToken(em, tokenId)

const limit = pLimit(10) // Limit to 10 concurrent promises

await Promise.all(
holdersAccounts.map((holdersAccount) =>
holderAccounts.map((holderAccount) =>
limit(() =>
addNotification(
em,
holdersAccount,
new MemberRecipient({ membership: holdersAccount.membershipId }),
holderAccount,
new MemberRecipient({ membership: holderAccount.membershipId }),
notificationType,
event,
dispatchBlock
Expand Down
70 changes: 39 additions & 31 deletions src/tests/integration/notifications.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { EntityManager } from 'typeorm'
import { IMemberRemarked, ReactVideo, MemberRemarked } from '@joystream/metadata-protobuf'
import { IMemberRemarked, MemberRemarked, ReactVideo } from '@joystream/metadata-protobuf'
import { AnyMetadataClass } from '@joystream/metadata-protobuf/types'
import { defaultTestBlock, populateDbWithSeedData } from './testUtils'
import { globalEm } from '../../utils/globalEm'
import {
excludeChannelService,
verifyChannelService,
} from '../../server-extension/resolvers/ChannelsResolver'
import { Store } from '@subsquid/typeorm-store'
import { expect } from 'chai'
import { config as dontenvConfig } from 'dotenv'
import Long from 'long'
import path from 'path'
import { EntityManager } from 'typeorm'
import { auctionBidMadeInner } from '../../mappings/content/nft'
import { processMemberRemarkedEvent } from '../../mappings/membership'
import { backwardCompatibleMetaID } from '../../mappings/utils'
import {
Account,
Channel,
Expand All @@ -26,21 +28,19 @@ import {
Video,
VideoLiked,
} from '../../model'
import { expect } from 'chai'
import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver'
import {
excludeChannelService,
verifyChannelService,
} from '../../server-extension/resolvers/ChannelsResolver'
import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver'
import { globalEm } from '../../utils/globalEm'
import {
OFFCHAIN_NOTIFICATION_ID_TAG,
RUNTIME_NOTIFICATION_ID_TAG,
} from '../../utils/notification/helpers'
import { setFeaturedNftsInner } from '../../server-extension/resolvers/AdminResolver'
import { auctionBidMadeInner } from '../../mappings/content/nft'
import { EntityManagerOverlay } from '../../utils/overlay'
import { Store } from '@subsquid/typeorm-store'
import { processMemberRemarkedEvent } from '../../mappings/membership'
import Long from 'long'
import { backwardCompatibleMetaID } from '../../mappings/utils'
import { config as dontenvConfig } from 'dotenv'
import path from 'path'
import { excludeVideoService } from '../../server-extension/resolvers/VideosResolver'
import { AnyEntity, Constructor, EntityManagerOverlay } from '../../utils/overlay'
import { defaultTestBlock, populateDbWithSeedData } from './testUtils'

dontenvConfig({
path: path.resolve(__dirname, './.env'),
Expand All @@ -50,9 +50,19 @@ const metadataToBytes = <T>(metaClass: AnyMetadataClass<T>, obj: T): Uint8Array
return Buffer.from(metaClass.encode(obj).finish())
}

const getNextNotificationId = async (em: EntityManager, onchain: boolean) => {
const getNextNotificationId = async (
store: EntityManager | EntityManagerOverlay,
onchain: boolean
) => {
const tag = onchain ? RUNTIME_NOTIFICATION_ID_TAG : OFFCHAIN_NOTIFICATION_ID_TAG
const row = await em.getRepository(NextEntityId).findOneBy({ entityName: tag })
if (store instanceof EntityManagerOverlay) {
const row = await store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.getOneBy({ entityName: tag })
const id = parseInt(row?.nextId.toString() || '1')
return id
}
const row = await store.getRepository(NextEntityId).findOneBy({ entityName: tag })
const id = parseInt(row?.nextId.toString() || '1')
return id
}
Expand All @@ -69,6 +79,7 @@ describe('notifications tests', () => {
let em: EntityManager
before(async () => {
em = await globalEm
overlay = await createOverlay()
await populateDbWithSeedData()
})
describe('👉 YPP Verify channel', () => {
Expand Down Expand Up @@ -281,10 +292,9 @@ describe('notifications tests', () => {

before(async () => {
const bidAmount = BigInt(100000)
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre
nft = await em.getRepository(OwnedNft).findOneByOrFail({ videoId })
overlay = await createOverlay()

await auctionBidMadeInner(
overlay,
Expand Down Expand Up @@ -368,8 +378,7 @@ describe('notifications tests', () => {
asV2001: ['3', metadataToBytes(MemberRemarked, metadataMessage), undefined],
} as any
before(async () => {
overlay = await createOverlay()
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()
})
it('should process video liked and deposit notification', async () => {
Expand All @@ -381,7 +390,7 @@ describe('notifications tests', () => {
event,
})

const nextNotificationId = await getNextNotificationId(em, true)
const nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification
Expand Down Expand Up @@ -422,8 +431,7 @@ describe('notifications tests', () => {
asV2001: ['2', metadataToBytes(MemberRemarked, metadataMessage), undefined], // avoid comment author == creator
} as any
before(async () => {
overlay = await createOverlay()
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()
})
it('should process comment to video and deposit notification', async () => {
Expand All @@ -435,7 +443,7 @@ describe('notifications tests', () => {
event,
})

const nextNotificationId = await getNextNotificationId(em, true)
const nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification | null
Expand Down Expand Up @@ -485,7 +493,7 @@ describe('notifications tests', () => {
} as any

before(async () => {
nextNotificationIdPre = await getNextNotificationId(em, true)
nextNotificationIdPre = await getNextNotificationId(overlay, true)
notificationId = RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationIdPre.toString()

await processMemberRemarkedEvent({
Expand All @@ -500,7 +508,7 @@ describe('notifications tests', () => {
describe('should process reply to comment and deposit notification', () => {
let nextNotificationId: number
before(async () => {
nextNotificationId = await getNextNotificationId(em, true)
nextNotificationId = await getNextNotificationId(overlay, true)
notification = (await overlay
.getRepository(Notification)
.getByIdOrFail(notificationId)) as Notification | null
Expand Down
33 changes: 29 additions & 4 deletions src/utils/nextEntityId.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
import { EntityManager } from 'typeorm'
import { NextEntityId } from '../model'
import { AnyEntity, Constructor, EntityManagerOverlay } from './overlay'

// used to retrieve the next id for an entity
export async function getNextIdForEntity(em: EntityManager, entityName: string): Promise<number> {
// used to retrieve the next id for an entity from NextEntityId table using either EntityManager or Overlay
export async function getNextIdForEntity(
store: EntityManager | EntityManagerOverlay,
entityName: string
): Promise<number> {
// Get next entity id from overlay (this will mostly be used in the mappings context)
if (store instanceof EntityManagerOverlay) {
const row = await store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.getOneBy({ entityName: entityName })

const id = parseInt(row?.nextId.toString() || '1')

// Update the id to be the next one in the overlay
if (row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did we increment the id for entity previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the addRuntimeNotification function

row.nextId++
} else {
store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.new({ entityName, nextId: id + 1 })
}

return id
}

// Get next entity id from EntityManager (this will mostly be used in the graphql-server/auth-api context)
let row: NextEntityId | null
if (process.env.TESTING === 'true' || process.env.TESTING === '1') {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
})
} else {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
lock: { mode: 'pessimistic_write' },
})
Expand Down
13 changes: 6 additions & 7 deletions src/utils/notification/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async function addOffChainNotification(
const nextNotificationId = await getNextIdForEntity(em, OFFCHAIN_NOTIFICATION_ID_TAG)

const notification = createNotification(
OFFCHAIN_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
`${OFFCHAIN_NOTIFICATION_ID_TAG}-${nextNotificationId}`,
account.id,
recipient,
notificationType,
Expand All @@ -186,21 +186,22 @@ async function addRuntimeNotification(
event: Event,
dispatchBlock?: number
) {
const em = overlay.getEm()
// get notification Id from orion_db in any case
const nextNotificationId = await getNextIdForEntity(em, RUNTIME_NOTIFICATION_ID_TAG)
const nextNotificationId = await getNextIdForEntity(overlay, RUNTIME_NOTIFICATION_ID_TAG)

const runtimeNotificationId = `${RUNTIME_NOTIFICATION_ID_TAG}-${nextNotificationId}`

// check that on-notification is not already present in orion_db in case the processor has been restarted (but not orion_db)
const existingNotification = await overlay
.getRepository(Notification)
.getById(nextNotificationId.toString())
.getById(runtimeNotificationId)

if (existingNotification) {
return
}

const notification = createNotification(
RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
runtimeNotificationId,
account.id,
recipient,
notificationType,
Expand All @@ -216,8 +217,6 @@ async function addRuntimeNotification(
await createEmailNotification(overlay, notification)
}

await saveNextNotificationId(em, nextNotificationId + 1, RUNTIME_NOTIFICATION_ID_TAG)

return notification.id
}

Expand Down
4 changes: 1 addition & 3 deletions src/utils/notification/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
export {
addNotification,
defaultNotificationPreferences,
preferencesForNotification,
addNotification,
} from './helpers'
// export * from './notificationTexts'
// export * from './notificationLinks'
8 changes: 4 additions & 4 deletions src/utils/overlay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,16 @@ export class EntityManagerOverlay {
constructor(
private em: EntityManager,
private nextEntityIds: NextEntityId[],
private afterDbUpdte: (em: EntityManager) => Promise<void>
private afterDbUpdate: (em: EntityManager) => Promise<void>
) {}

public static async create(store: Store, afterDbUpdte: (em: EntityManager) => Promise<void>) {
public static async create(store: Store, afterDbUpdate: (em: EntityManager) => Promise<void>) {
// FIXME: This is a little hacky, but we really need to access the underlying EntityManager
const em = await (store as unknown as { em: () => Promise<EntityManager> }).em()
// Add "admin" schema to search path in order to be able to access "hidden" entities
await em.query('SET search_path TO admin,public')
const nextEntityIds = await em.find(NextEntityId, {})
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdte)
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdate)
}

public totalCacheSize() {
Expand Down Expand Up @@ -400,6 +400,6 @@ export class EntityManagerOverlay {
})
)
await this.em.save(nextIds)
await this.afterDbUpdte(this.em)
await this.afterDbUpdate(this.em)
}
}
Loading