Skip to content

Commit

Permalink
Persisted queue idea
Browse files Browse the repository at this point in the history
  • Loading branch information
WRadoslaw committed Apr 22, 2024
1 parent 4a666fc commit edba6ec
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 20 deletions.
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ services:
depends_on:
- orion_db
volumes:
- processor_queue:/app/persistedQueue
- type: bind
source: .
target: /orion
Expand All @@ -68,6 +69,7 @@ services:
depends_on:
- orion_db
volumes:
- graphql_queue:/app/persistedQueue
- type: bind
source: .
target: /orion
Expand All @@ -90,6 +92,7 @@ services:
depends_on:
- orion_db
volumes:
- auth_api_queue:/app/persistedQueue
- type: bind
source: .
target: /orion
Expand All @@ -112,6 +115,7 @@ services:
depends_on:
- orion_db
volumes:
- interactions_api_queue:/app/persistedQueue
- type: bind
source: .
target: /orion
Expand All @@ -123,6 +127,10 @@ services:
volumes:
orion_db_data:
interactions_db_data:
interactions_api_queue:
auth_api_queue:
graphql_queue:
processor_queue:

networks:
default:
Expand Down
2 changes: 1 addition & 1 deletion src/interactions-server/handlers/videoConsumed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { components } from '../generated/api-types'
import { AuthContext } from '../../utils/auth'
import { recommendationServiceManager } from '../../utils/RecommendationServiceManager'
import { singlePurchaseLimiter } from '../interactionsLimiter'
import { PurchaseModel, RatingModel } from '../interactionsEm'
import { PurchaseModel } from '../interactionsEm'

type ReqParams = Record<string, string>
type ResBody =
Expand Down
70 changes: 51 additions & 19 deletions src/utils/RecommendationServiceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
import { createLogger } from '@subsquid/logger'
import { randomUUID } from 'crypto'
import { stringToHex } from '@polkadot/util'
import { PersistentQueue } from './persistingQueue'
import path from 'path'

export type RecommendationItemId = `${string}-${'video' | 'channel'}`

Expand Down Expand Up @@ -41,14 +43,14 @@ const recommendationServiceLogger = createLogger('recommendationsService')
const isDevEnv = process.env.ORION_ENV === 'development'

export class RecommendationServiceManager {
private _queue: ClientRequests.Request[] = []
private _queue: PersistentQueue<ClientRequests.Request>
private client: ApiClient | null = null

// if orion is launched in dev mode, always sync videos
private _enabled = false
private _loopInitialized = false

constructor() {
constructor(queueFilePath: string) {
if (
!process.env.RECOMMENDATION_SERVICE_PRIVATE_KEY ||
!process.env.RECOMMENDATION_SERVICE_DATABASE ||
Expand All @@ -59,6 +61,7 @@ export class RecommendationServiceManager {
)
return
}
this._queue = new PersistentQueue<ClientRequests.Request>(queueFilePath)
this.client = new ApiClient(
process.env.RECOMMENDATION_SERVICE_DATABASE,
process.env.RECOMMENDATION_SERVICE_PRIVATE_KEY,
Expand Down Expand Up @@ -94,12 +97,15 @@ export class RecommendationServiceManager {
const request = new ClientRequests.SetItemValues(`${video.id}-video`, actionObject, {
cascadeCreate: true,
})
this._queue.push(request)
this._queue.addToQueue(request).catch(() => undefined)
}

scheduleVideoDeletion(videoId: string) {
if (!this._enabled) {
return
}
const actionObject = new ClientRequests.DeleteItem(`${videoId}-video`)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

scheduleChannelUpsert(channel: Channel) {
Expand All @@ -121,12 +127,16 @@ export class RecommendationServiceManager {
const request = new ClientRequests.SetItemValues(`${channel.id}-channel`, actionObject, {
cascadeCreate: true,
})
this._queue.push(request)
this._queue.addToQueue(request).catch(() => undefined)
}

scheduleChannelDeletion(channelId: string) {
if (!this._enabled) {
return
}

const actionObject = new ClientRequests.DeleteItem(`${channelId}-channel`)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

scheduleUserUpsert(user: RSUser) {
Expand All @@ -141,7 +151,7 @@ export class RecommendationServiceManager {
cascadeCreate: true,
}
)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

// this interaction has big model value and should we used for
Expand All @@ -156,7 +166,7 @@ export class RecommendationServiceManager {
cascadeCreate: true,
recommId,
})
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

// this interaction should be dispatched when user clicks a video to see it
Expand All @@ -176,7 +186,7 @@ export class RecommendationServiceManager {
recommId,
duration,
})
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

// this interaction is for user engagement level
Expand All @@ -201,7 +211,7 @@ export class RecommendationServiceManager {
recommId,
}
)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

scheduleItemBookmark(itemId: RecommendationItemId, userId: string, recommId?: string) {
Expand All @@ -214,7 +224,7 @@ export class RecommendationServiceManager {
cascadeCreate: !isDevEnv,
recommId,
})
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

deleteItemBookmark(itemId: RecommendationItemId, userId: string) {
Expand All @@ -223,7 +233,7 @@ export class RecommendationServiceManager {
}

const actionObject = new ClientRequests.DeleteBookmark(this.mapUserId(userId), itemId)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

scheduleItemRating(
Expand All @@ -244,7 +254,7 @@ export class RecommendationServiceManager {
cascadeCreate: !isDevEnv,
recommId,
})
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

deleteItemRating(itemId: RecommendationItemId, userId: string) {
Expand All @@ -253,7 +263,7 @@ export class RecommendationServiceManager {
}

const actionObject = new ClientRequests.DeleteRating(userId, itemId)
this._queue.push(actionObject)
this._queue.addToQueue(actionObject).catch(() => undefined)
}

enableExport() {
Expand Down Expand Up @@ -298,6 +308,10 @@ export class RecommendationServiceManager {
}

async recommendItemsToUser(userId?: string, opts?: CommonOptions) {
if (!this._enabled) {
return
}

recommendationServiceLogger.info(
`Getting items recommendations to ${userId || 'empty user'}(${this.mapUserId(userId ?? '')})`
)
Expand All @@ -322,6 +336,10 @@ export class RecommendationServiceManager {
}

async recommendNextItems(recommId: string, opts?: CommonOptions) {
if (!this._enabled) {
return
}

const request = new ClientRequests.RecommendNextItems(recommId, opts?.limit ?? 10)
const res = await this.client?.send(request)
if (!res) {
Expand All @@ -332,6 +350,10 @@ export class RecommendationServiceManager {
}

async recommendItemsToItem(itemId: RecommendationItemId, userId?: string, opts?: CommonOptions) {
if (!this._enabled) {
return
}

const request = new ClientRequests.RecommendItemsToItem(
itemId,
userId ? this.mapUserId(userId) : randomUUID(),
Expand All @@ -354,6 +376,10 @@ export class RecommendationServiceManager {
}

async recommendNextVideo(itemId: RecommendationItemId, userId?: string, opts?: CommonOptions) {
if (!this._enabled) {
return
}

const request = new ClientRequests.RecommendItemsToItem(
itemId,
userId ? this.mapUserId(userId) : randomUUID(),
Expand Down Expand Up @@ -414,14 +440,20 @@ export class RecommendationServiceManager {

private async _batchUpdateLoop(intervalMs: number): Promise<void> {
while (true) {
if (this._queue.length) {
const batchArray = [...this._queue]
this._queue.length = 0
await this.sendBatchRequest(batchArray)
const queue = this._queue.getQueue()
if (queue.length) {
await this._queue.lockQueue(async () => {
await this.sendBatchRequest(queue)
})
await this._queue.flushQueue()
}
await new Promise((resolve) => setTimeout(resolve, intervalMs))
}
}
}

export const recommendationServiceManager = new RecommendationServiceManager()
const dirPath = path.resolve('../app/persistedQueue')

export const recommendationServiceManager = new RecommendationServiceManager(
`${dirPath}/interactions`
)
88 changes: 88 additions & 0 deletions src/utils/persistingQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import fs from 'fs'
import AsyncLock from 'async-lock'

export class PersistentQueue<T> {
private queue: T[] = []
private writeStream: fs.WriteStream | null = null
private initialized = false
private asyncLock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER })
filepath = ''

constructor(filepath: string) {
this.filepath = filepath
this.queue = []
this.loadQueue()
this.writeStream = fs.createWriteStream(this.filepath, { flags: 'a' })
this.initialized = true
}

loadQueue() {
try {
if (!fs.existsSync(this.filepath)) {
return
}
const data = fs.readFileSync(this.filepath, { encoding: 'utf-8' })
this.queue = JSON.parse(`[${data.split('\n').slice(0, -1).join(',')}]`)
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
console.error('Failed to load queue:', error)
}
}
}

lockQueue(cb: () => Promise<void>) {
return this.asyncLock.acquire(
this.filepath,
async (done) => {
await cb()
done()
},
{ skipQueue: true }
)
}

flushQueue() {
return this.asyncLock.acquire(
this.filepath,
async (done) => {
this.writeStream?.end()
fs.truncate(this.filepath, 0, (err) => {
if (err) {
done()
throw err
}
this.queue = []
this.writeStream = fs.createWriteStream(this.filepath, { flags: 'a' })
done()
})
},
{ skipQueue: true }
)
}

async addToQueue(item: T) {
if (!this.initialized) {
return
}
await this.writeItemToFile(item)
this.queue.push(item)
}

writeItemToFile(item: T) {
return this.asyncLock.acquire(this.filepath, async (done) => {
const dataToWrite = JSON.stringify(item).trim().replace('\n', '') + '\n'
this.writeStream?.write(dataToWrite, 'utf8', (err) => {
if (err) {
done()
throw err
} else {
done()
}
})
})
}

getQueue() {
return this.queue
}
}

0 comments on commit edba6ec

Please sign in to comment.