Skip to content

Commit

Permalink
add feature to do futures notifications delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanakram3 committed Mar 2, 2024
1 parent 6394eb1 commit 9f08cec
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 187 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

const { getViewDefinitions } = require('../viewDefinitions')

module.exports = class Views1709289253199 {
name = 'Views1709289253199'
module.exports = class Views1709385679652 {
name = 'Views1709385679652'

async up(db) {
const viewDefinitions = getViewDefinitions(db);
Expand Down
3 changes: 3 additions & 0 deletions schema/notifications.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Notification @entity {
"timestamp"
createdAt: DateTime!

"block after which notification should be dispatched (if null, then it should be dispatched immediately)"
dispatchBlock: Int

"recipient"
recipient: RecipientType!
}
Expand Down
8 changes: 7 additions & 1 deletion src/mail-scheduler/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EntityManager } from 'typeorm'
import { EntityManager, IsNull, LessThanOrEqual } from 'typeorm'
import { EmailDeliveryAttempt, EmailFailure, NotificationEmailDelivery } from '../model'
import { getCurrentBlockHeight } from '../utils/blockHeight'
import { ConfigVariable, config } from '../utils/config'
import { uniqueId } from '../utils/crypto'
import { globalEm } from '../utils/globalEm'
Expand All @@ -11,9 +12,14 @@ export async function getMaxAttempts(em: EntityManager): Promise<number> {
}

export async function mailsToDeliver(em: EntityManager): Promise<NotificationEmailDelivery[]> {
const { lastProcessedBlock } = await getCurrentBlockHeight(em)
const result = await em.getRepository(NotificationEmailDelivery).find({
where: {
discard: false,
notification: [
{ dispatchBlock: IsNull() },
{ dispatchBlock: LessThanOrEqual(lastProcessedBlock) },
],
},
relations: {
notification: { account: true },
Expand Down
74 changes: 48 additions & 26 deletions src/mappings/token/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import {
CreatorTokenRevenueShareEnded,
CreatorTokenRevenueSharePlanned,
CreatorTokenRevenueShareStarted,
CreatorTokenRevenueSplitIssuedEventData,
CreatorTokenSaleMint,
CreatorTokenSaleMintEventData,
CreatorTokenSaleStarted,
CreatorTokenSaleStartedEventData,
Event,
FutureNotificationOrionEvent,
InitialIssuanceVestingSource,
Membership,
RevenueShare,
Expand All @@ -41,8 +41,9 @@ import {
VestedSale,
VestingSchedule,
} from '../../model'
import { getCurrentBlockHeight } from '../../notifications-scheduler/utils'
import { getCurrentBlockHeight } from '../../utils/blockHeight'
import { EventHandlerContext } from '../../utils/events'
import { criticalError } from '../../utils/misc'
import { addNotification } from '../../utils/notification'
import { getChannelOwnerAccount, notifyChannelFollowers, parseChannelTitle } from '../content/utils'
import { deserializeMetadata, genericEventFields } from '../utils'
Expand Down Expand Up @@ -582,6 +583,8 @@ export async function processUpcomingTokenSaleUpdatedEvent({
export async function processRevenueSplitIssuedEvent({
overlay,
block,
extrinsicHash,
indexInBlock,
event: {
asV1000: [tokenId, startBlock, duration, joyAllocation],
},
Expand All @@ -592,11 +595,18 @@ export async function processRevenueSplitIssuedEvent({
.getRepository(TokenChannel)
.getOneByRelationOrFail('tokenId', tokenId.toString())
const channel = await overlay.getRepository(Channel).getByIdOrFail(tokenChannel.channelId)
const { lastProcessedBlock } = await getCurrentBlockHeight(overlay.getEm())
const token = (await overlay
.getRepository(CreatorToken)
.getByIdOrFail(tokenId.toString())) as CreatorToken

const { lastProcessedBlock } = await getCurrentBlockHeight(overlay.getEm())
if (lastProcessedBlock < 0) {
// If within the mappings context we are not able to get the correct block height
// (i.e. height is -ve when this event is emitted), it means something is wrong
// with the processor state and we should panic we should panic
criticalError('Failed to get current block height from "squid_processor"."status" table')
}

const revenueShare = overlay.getRepository(RevenueShare).new({
id,
allocation: joyAllocation,
Expand All @@ -612,46 +622,58 @@ export async function processRevenueSplitIssuedEvent({

token.currentRevenueShareId = id

overlay.getRepository(Event).new({
...genericEventFields(overlay, block, indexInBlock, extrinsicHash),
data: new CreatorTokenRevenueSplitIssuedEventData({
token: tokenId.toString(),
revenueShare: id,
}),
})

// Schedule/Dispatch Notifications

const revenueShareStartedNotification = new CreatorTokenRevenueShareStarted({
revenueShareId: revenueShare.id,
tokenId: tokenId.toString(),
channelTitle: parseChannelTitle(channel),
channelId: channel.id,
tokenSymbol: parseCreatorTokenSymbol(token),
})
// revenue share is planned for future block
if (lastProcessedBlock < startBlock && lastProcessedBlock > 0) {
const plannedNotificationData = new CreatorTokenRevenueSharePlanned({
await notifyTokenHolders(
overlay.getEm(),
tokenId.toString(),
revenueShareStartedNotification,
undefined,
startBlock // schedule for start block
)

// if revenue share is planned for future block then also schedule a notification with immediate delivery
if (lastProcessedBlock < startBlock) {
const revenueSharePlannedNotification = new CreatorTokenRevenueSharePlanned({
revenueShareId: revenueShare.id,
channelTitle: parseChannelTitle(channel),
channelId: channel.id,
plannedAt: startBlock,
tokenSymbol: parseCreatorTokenSymbol(token),
})
overlay.getRepository(FutureNotificationOrionEvent).new({
id: `${revenueShare.id}-${block.height}-rSStart`,
executionBlock: startBlock,
notificationType: revenueShareStartedNotification,
})

await notifyTokenHolders(overlay.getEm(), tokenId.toString(), plannedNotificationData)

// revenue share starts at creation
} else if (lastProcessedBlock > 0) {
await notifyTokenHolders(overlay.getEm(), tokenId.toString(), revenueShareStartedNotification)
await notifyTokenHolders(overlay.getEm(), tokenId.toString(), revenueSharePlannedNotification)
}

overlay.getRepository(FutureNotificationOrionEvent).new({
id: `${revenueShare.id}-${block.height}-rSEnd`,
executionBlock: endsAt,
notificationType: new CreatorTokenRevenueShareEnded({
revenueShareId: revenueShare.id,
channelTitle: parseChannelTitle(channel),
channelId: channel.id,
tokenSymbol: parseCreatorTokenSymbol(token),
tokenId: tokenId.toString(),
}),
const revenueSharedEndedNotification = new CreatorTokenRevenueShareEnded({
revenueShareId: revenueShare.id,
channelTitle: parseChannelTitle(channel),
channelId: channel.id,
tokenSymbol: parseCreatorTokenSymbol(token),
tokenId: tokenId.toString(),
})
await notifyTokenHolders(
overlay.getEm(),
tokenId.toString(),
revenueSharedEndedNotification,
undefined,
endsAt // schedule for end block
)
}

export async function processMemberJoinedWhitelistEvent({
Expand Down
41 changes: 25 additions & 16 deletions src/mappings/token/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { criticalError } from '../../utils/misc'
import { Flat, EntityManagerOverlay } from '../../utils/overlay'
import { ITokenMetadata } from '@joystream/metadata-protobuf'
import { DecodedMetadataObject } from '@joystream/metadata-protobuf/types'
import { isSet } from '@joystream/metadata-protobuf/utils'
import pLimit from 'p-limit'
import { EntityManager } from 'typeorm'
import {
Account,
Benefit,
Expand All @@ -17,13 +20,10 @@ import {
Video,
} from '../../model'
import { Validated, ValidatedPayment, VestingScheduleParams } from '../../types/v1000'
import { isSet } from '@joystream/metadata-protobuf/utils'
import { uniqueId } from '../../utils/crypto'
import { ITokenMetadata } from '@joystream/metadata-protobuf'
import { DecodedMetadataObject } from '@joystream/metadata-protobuf/types'
import { criticalError } from '../../utils/misc'
import { addNotification } from '../../utils/notification'
import pLimit from 'p-limit'
import { EntityManager } from 'typeorm'
import { EntityManagerOverlay, Flat } from '../../utils/overlay'

export const FALLBACK_TOKEN_SYMBOL = '??'

Expand Down Expand Up @@ -344,16 +344,25 @@ export async function notifyTokenHolders(
em: EntityManager,
tokenId: string,
notificationType: NotificationType,
event?: Event
event?: Event,
dispatchBlock?: number
) {
const holdersAccounts = await getHolderAccountsForToken(em, tokenId)
for (const holdersAccount of holdersAccounts) {
await addNotification(
em,
holdersAccount,
new MemberRecipient({ membership: holdersAccount.membershipId }),
notificationType,
event

const limit = pLimit(10) // Limit to 10 concurrent promises

await Promise.all(
holdersAccounts.map((holdersAccount) =>
limit(() =>
addNotification(
em,
holdersAccount,
new MemberRecipient({ membership: holdersAccount.membershipId }),
notificationType,
event,
dispatchBlock
)
)
)
}
)
}
56 changes: 0 additions & 56 deletions src/notifications-scheduler/index.ts

This file was deleted.

76 changes: 0 additions & 76 deletions src/notifications-scheduler/utils.ts

This file was deleted.

Loading

0 comments on commit 9f08cec

Please sign in to comment.