Skip to content

Commit

Permalink
Adds logic to perpetually keep a send list up to date (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Oct 20, 2024
1 parent 4528f29 commit b82f485
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 16 deletions.
22 changes: 22 additions & 0 deletions apps/platform/src/campaigns/Campaign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import Model, { ModelParams } from '../core/Model'
import List from '../lists/List'
import Template from '../render/Template'
import Subscription from '../subscriptions/Subscription'
import { crossTimezoneCopy } from '../utilities'
import Project from '../projects/Project'
import { User } from '../users/User'

export type CampaignState = 'draft' | 'scheduled' | 'pending' | 'running' | 'finished' | 'aborted'
export interface CampaignDelivery {
Expand Down Expand Up @@ -68,6 +71,25 @@ export class CampaignSend extends Model {
get hasCompleted() {
return ['aborted', 'sent', 'failed', 'bounced'].includes(this.state)
}

static create(
campaign: SentCampaign,
project: Pick<Project, 'timezone'>,
user: Pick<User, 'id' | 'timezone'>,
): CampaignSendParams {
return {
user_id: user.id,
campaign_id: campaign.id,
state: 'pending',
send_at: campaign.send_in_user_timezone
? crossTimezoneCopy(
campaign.send_at,
project.timezone,
user.timezone ?? project.timezone,
)
: campaign.send_at,
}
}
}

export type CampaignSendParams = Pick<CampaignSend, 'campaign_id' | 'user_id' | 'state' | 'send_at'>
Expand Down
60 changes: 47 additions & 13 deletions apps/platform/src/campaigns/CampaignService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { PageParams } from '../core/searchParams'
import { allLists } from '../lists/ListService'
import { allTemplates, duplicateTemplate, screenshotHtml, templateInUserLocale, validateTemplates } from '../render/TemplateService'
import { getSubscription, getUserSubscriptionState } from '../subscriptions/SubscriptionService'
import { chunk, crossTimezoneCopy, pick, shallowEqual } from '../utilities'
import { chunk, pick, shallowEqual } from '../utilities'
import { getProvider } from '../providers/ProviderRepository'
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
import { getProject } from '../projects/ProjectService'
Expand All @@ -21,6 +21,7 @@ import CampaignGenerateListJob from './CampaignGenerateListJob'
import Project from '../projects/Project'
import Template from '../render/Template'
import { subDays } from 'date-fns'
import { raw } from '../core/Model'

export const pagedCampaigns = async (params: PageParams, projectId: number) => {
const result = await Campaign.search(
Expand Down Expand Up @@ -297,18 +298,9 @@ export const generateSendList = async (campaign: SentCampaign) => {
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
}, ({ user_id, timezone }: { user_id: number, timezone: string }) => ({
user_id,
campaign_id: campaign.id,
state: 'pending',
send_at: campaign.send_in_user_timezone
? crossTimezoneCopy(
campaign.send_at,
project.timezone,
timezone ?? project.timezone,
)
: campaign.send_at,
}))
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)

await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}
Expand Down Expand Up @@ -467,3 +459,45 @@ export const canSendCampaignToUser = async (campaign: Campaign, user: Pick<User,
if (campaign.channel === 'push' && !user.devices) return false
return true
}

export const updateCampaignSendEnrollment = async (user: User) => {
const campaigns = await Campaign.query()
.leftJoin('campaign_sends', (qb) =>
qb.on('campaign_sends.campaign_id', 'campaigns.id')
.andOn('campaign_sends.user_id', raw(user.id)),
)
.leftJoin('projects', 'projects.id', 'campaigns.project_id')
.where('campaigns.project_id', user.project_id)
.where('campaigns.state', 'scheduled')
.select('campaigns.*', 'campaign_sends.id AS send_id', 'campaign_sends.state AS send_state', 'projects.timezone') as Array<SentCampaign & { send_id: number, send_state: CampaignSendState, timezone: string }>

const join = []
const leave = []
for (const campaign of campaigns) {
const match = await recipientQuery(campaign)
.where('users.id', user.id)
.first()

// If user matches recipient query and they aren't already in the
// list, add to send list
if (match && !campaign.send_id) {
join.push(CampaignSend.create(campaign, Project.fromJson({ timezone: campaign.timezone }), user))
}

// If user is not in recipient list but we have a record, remove from
// send list
if (!match && campaign.send_id && campaign.send_state === 'pending') {
leave.push(campaign.send_id)
}
}

if (leave.length) {
await CampaignSend.query().whereIn('id', leave).delete()
}
if (join.length) {
await CampaignSend.query()
.insert(join)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
}
}
75 changes: 73 additions & 2 deletions apps/platform/src/campaigns/__tests__/CampaignService.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import App from '../../app'
import EmailJob from '../../providers/email/EmailJob'
import { RequestError } from '../../core/errors'
import { addUserToList, createList } from '../../lists/ListService'
import { createSubscription, subscribe } from '../../subscriptions/SubscriptionService'
import { createSubscription, subscribe, subscribeAll } from '../../subscriptions/SubscriptionService'
import { User } from '../../users/User'
import { uuid } from '../../utilities'
import Campaign, { CampaignSend, SentCampaign } from '../Campaign'
import { allCampaigns, createCampaign, getCampaign, sendCampaign, generateSendList, estimatedSendSize } from '../CampaignService'
import { allCampaigns, createCampaign, getCampaign, sendCampaign, generateSendList, estimatedSendSize, updateCampaignSendEnrollment } from '../CampaignService'
import { createProvider } from '../../providers/ProviderRepository'
import { createTestProject } from '../../projects/__tests__/ProjectTestHelpers'
import ListStatsJob from '../../lists/ListStatsJob'
Expand Down Expand Up @@ -274,4 +274,75 @@ describe('CampaignService', () => {
expect(sendSize).toEqual(40)
})
})

describe('updateCampaignSendEnrollment', () => {
test('join a user to a scheduled campaign', async () => {
const params = await createCampaignDependencies()
const list = await createList(params.project_id, {
name: uuid(),
type: 'static',
is_visible: true,
})
const campaign = await createTestCampaign(params, {
list_ids: [list.id],
}) as SentCampaign
await Campaign.updateAndFetch(campaign.id, { state: 'scheduled' })

const user = await createUser(campaign.project_id)
await subscribeAll(user)
await addUserToList(user, list)

await updateCampaignSendEnrollment(user)

const updated = await CampaignSend.first(qb => qb.where('campaign_id', campaign.id).where('user_id', user.id))

expect(updated).not.toBeUndefined()
})

test('do not join a user to a draft campaign', async () => {
const params = await createCampaignDependencies()
const list = await createList(params.project_id, {
name: uuid(),
type: 'static',
is_visible: true,
})
const campaign = await createTestCampaign(params, {
list_ids: [list.id],
}) as SentCampaign

const user = await createUser(campaign.project_id)
await subscribeAll(user)
await addUserToList(user, list)

await updateCampaignSendEnrollment(user)

const updated = await CampaignSend.first(qb => qb.where('campaign_id', campaign.id).where('user_id', user.id))

expect(updated).toBeUndefined()
})

test('remove a user who no longer matches list', async () => {
const params = await createCampaignDependencies()
const list = await createList(params.project_id, {
name: uuid(),
type: 'static',
is_visible: true,
})
const campaign = await createTestCampaign(params, {
list_ids: [list.id],
}) as SentCampaign
await Campaign.updateAndFetch(campaign.id, { state: 'scheduled' })

const user = await createUser(campaign.project_id)
await subscribeAll(user)

await CampaignSend.insert({ campaign_id: campaign.id, user_id: user.id, state: 'pending' })

await updateCampaignSendEnrollment(user)

const updated = await CampaignSend.first(qb => qb.where('campaign_id', campaign.id).where('user_id', user.id))

expect(updated).toBeUndefined()
})
})
})
10 changes: 10 additions & 0 deletions apps/platform/src/lists/ListService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { createTagSubquery, getTags, setTags } from '../tags/TagService'
import { Chunker, visit } from '../utilities'
import { getUserEventsForRules } from '../users/UserRepository'
import { RuleResults, RuleWithEvaluationResult, checkRules, decompileRule, fetchAndCompileRule, mergeInsertRules } from '../rules/RuleService'
import { updateCampaignSendEnrollment } from '../campaigns/CampaignService'
import { cacheDecr, cacheIncr } from '../config/redis'
import App from '../app'

Expand Down Expand Up @@ -297,19 +298,28 @@ export const isUserInList = async (user_id: number, list_id: number) => {

export const updateUsersLists = async (user: User, results: RuleResults, event?: UserEvent) => {

const dirtyLists = new Set<number>()
if (results.success.length) {
const successLists = await listsForRule(results.success, user.project_id)
for (const list of successLists) {
await addUserToList(user, list, event)
dirtyLists.add(list.id)
}
}

if (results.failure.length) {
const failureLists = await listsForRule(results.failure, user.project_id)
for (const list of failureLists) {
await removeUserFromList(user, list)
dirtyLists.add(list.id)
}
}

// If any lists were updated for the user, check associated campaigns
// to see if send list needs to be updated
if (dirtyLists.size > 0) {
await updateCampaignSendEnrollment(user)
}
}

export const listsForRule = async (ruleUuids: string[], projectId: number): Promise<DynamicList[]> => {
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/providers/Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export default class Provider extends Model {

export type ProviderMap<T extends Provider> = (record: any) => T

export type ProviderParams = Omit<Provider, ModelParams | 'setup' | 'loadSetup'>
export type ProviderParams = Omit<Provider, ModelParams | 'setup' | 'loadSetup' | 'interval'>

export type ExternalProviderParams = Omit<ProviderParams, 'group'>

Expand Down

0 comments on commit b82f485

Please sign in to comment.