Skip to content

Commit

Permalink
Remove channel from sync
Browse files Browse the repository at this point in the history
  • Loading branch information
WRadoslaw committed Feb 7, 2024
1 parent 58f64fe commit a44c381
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 55 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ codegen:
@npm run generate:schema || true
@npx squid-typeorm-codegen

setup-recommendations:
@npm run recommendations:setup

typegen:
@npx squid-substrate-typegen typegen.json

prepare: install codegen build
prepare: install codegen build setup-recommendations

up-squid:
@docker network create joystream_default || true
Expand Down
14 changes: 14 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"processor-start": "node -r dotenv-expand/config lib/processor.js",
"query-node-start": "NODE_ENV=production patch-package --patch-dir assets/patches && squid-graphql-server --subscriptions",
"auth-server-start": "node lib/auth-server/index.js",
"recommendations:setup": "node lib/scripts/setupRecombee.js",
"interactions-server-start": "node lib/interactions-server/index.js",
"postinstall": "patch-package --patch-dir assets/patches",
"mail-scheduler": "npx ts-node ./src/mail-scheduler/index.ts",
Expand Down Expand Up @@ -69,6 +70,7 @@
"graphql-tools": "^8.3.11",
"handlebars": "^4.7.7",
"haversine-distance": "^1.2.1",
"languagedetect": "^2.0.0",
"lodash": "^4.17.21",
"node-cache": "^5.1.2",
"node-schedule": "^2.1.1",
Expand Down
2 changes: 0 additions & 2 deletions src/mappings/content/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export async function processChannelCreatedEvent({
await processChannelMetadata(overlay, block, channel, metadata, dataObjects)
}
}
recommendationServiceManager.scheduleChannelUpsert(channel as Channel)

if (ownerMember) {
ownerMember.totalChannelsCreated += 1
Expand Down Expand Up @@ -163,7 +162,6 @@ export async function processChannelUpdatedEvent({
} else {
channelMetadataUpdate = deserializeMetadata(ChannelMetadata, channelUpdateParameters.newMeta)
}
recommendationServiceManager.scheduleChannelUpsert(channel as Channel)
await processChannelMetadata(
overlay,
block,
Expand Down
11 changes: 9 additions & 2 deletions src/mappings/content/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export async function processVideoCreatedEvent({
channelId: channel.id,
})
await notifyChannelFollowers(overlay, channel.id, notificationData, eventEntity)
recommendationServiceManager.scheduleVideoUpsert(video as Video)
recommendationServiceManager.scheduleVideoUpsert(video as Video, channel as Channel)
if (autoIssueNft) {
await processNft(overlay, block, indexInBlock, extrinsicHash, video, contentActor, autoIssueNft)
}
Expand All @@ -150,6 +150,10 @@ export async function processVideoUpdatedEvent({
}: EventHandlerContext<'Content.VideoUpdated'>): Promise<void> {
const { newMeta, autoIssueNft } = contentUpdateParameters
const video = await overlay.getRepository(Video).getByIdOrFail(contentId.toString())
let channel
if (video.channelId) {
channel = await overlay.getRepository(Channel).getByIdOrFail(video.channelId)
}

const appAction = newMeta && deserializeMetadata(AppAction, newMeta, { skipWarning: true })

Expand Down Expand Up @@ -180,7 +184,10 @@ export async function processVideoUpdatedEvent({
)
}

recommendationServiceManager.scheduleVideoUpsert(video as Video)
if (channel && video) {
recommendationServiceManager.scheduleVideoUpsert(video as Video, channel as Channel)
}

if (autoIssueNft) {
await processNft(overlay, block, indexInBlock, extrinsicHash, video, contentActor, autoIssueNft)
}
Expand Down
22 changes: 10 additions & 12 deletions src/scripts/setupRecombee.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiClient, requests } from 'recombee-api-client'
import { RSChannel, RSVideo } from '../utils/RecommendationServiceManager'
import { RSVideo } from '../utils/RecommendationServiceManager'
import dotenv from 'dotenv'

type RecombeePropTypes =
Expand All @@ -12,16 +12,14 @@ type RecombeePropTypes =
| 'image'
| 'imageList'

type JoinedItemProps = keyof RSChannel | keyof RSVideo
type JoinedItemProps = keyof RSVideo

const itemPropToType: Record<JoinedItemProps, RecombeePropTypes> = {
follows_num: 'int',
total_videos_created: 'int',
title: 'string',
channel_title: 'string',
timestamp: 'timestamp',
language: 'string',
description: 'string',
type: 'string',
channel_description: 'string',
reactions_count: 'int',
category_id: 'string',
comments_count: 'int',
Expand Down Expand Up @@ -54,13 +52,13 @@ async function main() {
)

// setup segments
const segmentsReqs = [
new requests.CreateManualReqlSegmentation('item-categories', 'items'),
new requests.AddManualReqlSegment('item-categories', 'video', '\'itemId\' ~ ".*-video$"'),
new requests.AddManualReqlSegment('item-categories', 'channel', '\'itemId\' ~ ".*-channel"'),
]
const segmentsReqs = new requests.CreatePropertyBasedSegmentation(
'channel-segmentation',
'items',
'channel_id'
)

const res = await client.send(new requests.Batch([...itemPropertiesReqs, ...segmentsReqs]))
const res = await client.send(new requests.Batch([...itemPropertiesReqs, segmentsReqs]))
const failedRequests = res.filter((req) => req.code === 400)

if (failedRequests.length) {
Expand Down
7 changes: 7 additions & 0 deletions src/server-extension/resolvers/ChannelsResolver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ export class ChannelsResolver {
// Set by depenency injection
constructor(private em: () => Promise<EntityManager>) {}

// @Query()
// async searchChannels(
// @Args() args: ExtendedChannelsArgs,
// @Info() info: GraphQLResolveInfo,
// @Ctx() ctx: Context
// ) {}

@Query(() => [ExtendedChannel])
async extendedChannels(
@Args() args: ExtendedChannelsArgs,
Expand Down
8 changes: 8 additions & 0 deletions src/server-extension/resolvers/baseTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ export class RecommendedVideosQuery {
@Field(() => String, { nullable: false }) recommId!: string
@Field(() => [Video]) video: Video[]
}

@ObjectType()
export class RecommendedChannelsQuery {
@Field(() => Int, { nullable: true }) numberNextRecommsCalls?: number
@Field(() => String, { nullable: false }) recommId!: string
@Field(() => [Channel]) video: Channel[]
}

export const OwnedNftWhereInput = new GraphQLScalarType({
name: 'OwnedNftWhereInput',
})
42 changes: 6 additions & 36 deletions src/utils/RecommendationServiceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@ export type RSVideo = {
reactions_count: number
views_num: number
channel_id?: string
type: 'video'
channel_title?: string
channel_description?: string
} & Required<Pick<Video, 'duration' | 'language' | 'title'>>

export type RSChannel = {
timestamp: Date
follows_num: number
total_videos_created: number
type: 'channel'
} & Pick<Channel, 'title' | 'description' | 'language'>

type RSUser = Pick<User, 'id'>

type CommonOptions = {
Expand Down Expand Up @@ -68,7 +62,7 @@ export class RecommendationServiceManager {
)
}

scheduleVideoUpsert(video: Video) {
scheduleVideoUpsert(video: Video, channel: Channel) {
// for dev env sync only up to 20k videos
if (!this._enabled || (isDevEnv && Number(video.id) > 20_000)) {
return
Expand All @@ -80,14 +74,13 @@ export class RecommendationServiceManager {
channel_id: video.channelId ?? undefined,
comments_count: video.commentsCount,
duration: video.duration,
language: languageText ? predictLanguage(languageText) : video.language,
language: predictLanguage(languageText) ?? video.language,
reactions_count: video.reactionsCount,
timestamp: new Date(video.createdAt),
title: video.title,
views_num: video.viewsNum,
// recombee only offers single items table, so it would be good to have type
// in case we decide to add new type in the future, like NFT
type: 'video',
channel_description: channel.description ?? undefined,
channel_title: channel.title ?? undefined,
}

const request = new ClientRequests.SetItemValues(`${video.id}-video`, actionObject, {
Expand All @@ -101,29 +94,6 @@ export class RecommendationServiceManager {
this._queue.push(actionObject)
}

scheduleChannelUpsert(channel: Channel) {
// for dev env sync only up to 20k channels
if (!this._enabled || (isDevEnv && Number(channel.id) > 20_000)) {
return
}
const languageText = (channel.title ?? '') + (channel.description ?? '')

const actionObject: RSChannel = {
language: languageText ? predictLanguage(languageText) : channel.language,
description: channel.description,
title: channel.title,
timestamp: channel.createdAt,
total_videos_created: channel.totalVideosCreated,
follows_num: channel.followsNum,
type: 'channel',
}

const request = new ClientRequests.SetItemValues(`${channel.id}-channel`, actionObject, {
cascadeCreate: true,
})
this._queue.push(request)
}

scheduleChannelDeletion(channelId: string) {
const actionObject = new ClientRequests.DeleteItem(`${channelId}-channel`)
this._queue.push(actionObject)
Expand Down
4 changes: 2 additions & 2 deletions src/utils/language.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const lngDetector = new LanguageDetect()
lngDetector.setLanguageType('iso2')

// Example usage
export const predictLanguage = (text: string) => {
export const predictLanguage = (text: string): string | undefined => {
const cleanedText = cleanString(text)
return lngDetector.detect(cleanedText, 1)[0][0]
return lngDetector.detect(cleanedText, 1)[0]?.[0]
}

0 comments on commit a44c381

Please sign in to comment.