Skip to content

Commit

Permalink
fix: avoiding IDs override while creating conturrent runtime notifica…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
zeeshanakram3 committed Jul 16, 2024
1 parent 6167139 commit fedafca
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 27 deletions.
12 changes: 6 additions & 6 deletions src/mappings/token/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ export async function getHolderAccountsForToken(
const holders = await em.getRepository(TokenAccount).findBy({ tokenId })

const holdersMemberIds = holders
.filter((follower) => follower?.memberId)
.map((follower) => follower.memberId as string)
.filter((holder) => holder?.memberId)
.map((holder) => holder.memberId as string)

const limit = pLimit(10) // Limit to 10 concurrent promises
const holdersAccounts: (Account | null)[] = await Promise.all(
Expand All @@ -347,17 +347,17 @@ export async function notifyTokenHolders(
event?: Event,
dispatchBlock?: number
) {
const holdersAccounts = await getHolderAccountsForToken(em, tokenId)
const holderAccounts = await getHolderAccountsForToken(em, tokenId)

const limit = pLimit(10) // Limit to 10 concurrent promises

await Promise.all(
holdersAccounts.map((holdersAccount) =>
holderAccounts.map((holderAccount) =>
limit(() =>
addNotification(
em,
holdersAccount,
new MemberRecipient({ membership: holdersAccount.membershipId }),
holderAccount,
new MemberRecipient({ membership: holderAccount.membershipId }),
notificationType,
event,
dispatchBlock
Expand Down
31 changes: 26 additions & 5 deletions src/utils/nextEntityId.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
import { EntityManager } from 'typeorm'
import { NextEntityId } from '../model'
import { AnyEntity, Constructor, EntityManagerOverlay } from './overlay'

// used to retrieve the next id for an entity
export async function getNextIdForEntity(em: EntityManager, entityName: string): Promise<number> {
// used to retrieve the next id for an entity from NextEntityId table using either EntityManager or Overlay
export async function getNextIdForEntity(
store: EntityManager | EntityManagerOverlay,
entityName: string
): Promise<string | undefined> {
// Get next entity id from overlay (this will mostly be used in the mappings context)
if (store instanceof EntityManagerOverlay) {
const row = await store
.getRepository(NextEntityId as Constructor<NextEntityId & AnyEntity>)
.getOneBy({ entityName: entityName })

const id = row?.nextId.toString()

// Update the id to be the next one in the overlay
if (row) {
row.nextId++
}

return id
}

// Get next entity id from EntityManager (this will mostly be used in the graphql-server/auth-api context)
let row: NextEntityId | null
if (process.env.TESTING === 'true' || process.env.TESTING === '1') {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
})
} else {
row = await em.getRepository(NextEntityId).findOne({
row = await store.getRepository(NextEntityId).findOne({
where: { entityName },
lock: { mode: 'pessimistic_write' },
})
}
const id = parseInt(row?.nextId.toString() || '1')
const id = row?.nextId.toString()
return id
}
22 changes: 13 additions & 9 deletions src/utils/notification/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Unread,
} from '../../model'
import { uniqueId } from '../crypto'
import { criticalError } from '../misc'
import { getNextIdForEntity } from '../nextEntityId'
import { EntityManagerOverlay } from '../overlay'

Expand Down Expand Up @@ -156,10 +157,10 @@ async function addOffChainNotification(
dispatchBlock?: number
) {
// get notification Id from orion_db in any case
const nextNotificationId = await getNextIdForEntity(em, OFFCHAIN_NOTIFICATION_ID_TAG)
const nextNotificationId = (await getNextIdForEntity(em, OFFCHAIN_NOTIFICATION_ID_TAG)) || '1'

const notification = createNotification(
OFFCHAIN_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
`${OFFCHAIN_NOTIFICATION_ID_TAG}-${nextNotificationId}`,
account.id,
recipient,
notificationType,
Expand All @@ -175,7 +176,7 @@ async function addOffChainNotification(
await createEmailNotification(em, notification)
}

await saveNextNotificationId(em, nextNotificationId + 1, OFFCHAIN_NOTIFICATION_ID_TAG)
await saveNextNotificationId(em, parseInt(nextNotificationId) + 1, OFFCHAIN_NOTIFICATION_ID_TAG)
}

async function addRuntimeNotification(
Expand All @@ -186,21 +187,26 @@ async function addRuntimeNotification(
event: Event,
dispatchBlock?: number
) {
const em = overlay.getEm()
// get notification Id from orion_db in any case
const nextNotificationId = await getNextIdForEntity(em, RUNTIME_NOTIFICATION_ID_TAG)
const nextNotificationId = await getNextIdForEntity(overlay, RUNTIME_NOTIFICATION_ID_TAG)

if (!nextNotificationId) {
criticalError(`NextEntityId counter for "RuntimeNotification" tag not found`)
}

const runtimeNotificationId = `${RUNTIME_NOTIFICATION_ID_TAG}-${nextNotificationId}`

// check that on-notification is not already present in orion_db in case the processor has been restarted (but not orion_db)
const existingNotification = await overlay
.getRepository(Notification)
.getById(nextNotificationId.toString())
.getById(runtimeNotificationId)

if (existingNotification) {
return
}

const notification = createNotification(
RUNTIME_NOTIFICATION_ID_TAG + '-' + nextNotificationId.toString(),
runtimeNotificationId,
account.id,
recipient,
notificationType,
Expand All @@ -216,8 +222,6 @@ async function addRuntimeNotification(
await createEmailNotification(overlay, notification)
}

await saveNextNotificationId(em, nextNotificationId + 1, RUNTIME_NOTIFICATION_ID_TAG)

return notification.id
}

Expand Down
4 changes: 1 addition & 3 deletions src/utils/notification/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
export {
addNotification,
defaultNotificationPreferences,
preferencesForNotification,
addNotification,
} from './helpers'
// export * from './notificationTexts'
// export * from './notificationLinks'
8 changes: 4 additions & 4 deletions src/utils/overlay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,16 @@ export class EntityManagerOverlay {
constructor(
private em: EntityManager,
private nextEntityIds: NextEntityId[],
private afterDbUpdte: (em: EntityManager) => Promise<void>
private afterDbUpdate: (em: EntityManager) => Promise<void>
) {}

public static async create(store: Store, afterDbUpdte: (em: EntityManager) => Promise<void>) {
public static async create(store: Store, afterDbUpdate: (em: EntityManager) => Promise<void>) {
// FIXME: This is a little hacky, but we really need to access the underlying EntityManager
const em = await (store as unknown as { em: () => Promise<EntityManager> }).em()
// Add "admin" schema to search path in order to be able to access "hidden" entities
await em.query('SET search_path TO admin,public')
const nextEntityIds = await em.find(NextEntityId, {})
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdte)
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdate)
}

public totalCacheSize() {
Expand Down Expand Up @@ -400,6 +400,6 @@ export class EntityManagerOverlay {
})
)
await this.em.save(nextIds)
await this.afterDbUpdte(this.em)
await this.afterDbUpdate(this.em)
}
}

0 comments on commit fedafca

Please sign in to comment.