Skip to content

Commit

Permalink
Optimize feed article field lookups with temp tables
Browse files Browse the repository at this point in the history
Scope field article inserts to request with AsyncLocalStorage
  • Loading branch information
synzen committed Dec 9, 2024
1 parent 2745e53 commit 05e70b6
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 104 deletions.
4 changes: 4 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ services:
environment:
- FEED_REQUESTS_START_TARGET=api
- NODE_ENV=local
networks:
- monitorss-default

feed-requests-postgres-migration:
extends:
Expand Down Expand Up @@ -158,6 +160,8 @@ services:
- NODE_ENV=local
- USER_FEEDS_FEED_REQUESTS_API_URL=http://feed-requests-api:5000/v1/feed-requests
- USER_FEEDS_FEED_REQUESTS_GRPC_URL=dns:///feed-requests-service:4999
networks:
- monitorss-default

user-feeds-postgres-migration:
extends:
Expand Down
4 changes: 2 additions & 2 deletions services/user-feeds/src/articles/articles.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ export class ArticlesService {

if (debug) {
logger.datadog(
`Debug feed ${id}: ${newArticles.length} new articles determined`,
`Debug feed ${id}: ${newArticles.length} new articles determined from ID checks`,
{
articles: newArticles.map((a) => ({
id: a.flattened.id,
Expand Down Expand Up @@ -498,7 +498,7 @@ export class ArticlesService {
logger.datadog(
`Debug feed ${id}: ${articlesPostDateCheck.length} articles after date checks`,
{
articles: newArticles.map((a) => ({
articles: articlesPostDateCheck.map((a) => ({
id: a.flattened.id,
title: a.flattened.title,
})),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,63 @@ import { Injectable } from "@nestjs/common";
import dayjs from "dayjs";
import logger from "../shared/utils/logger";
import PartitionedFeedArticleFieldInsert from "./types/pending-feed-article-field-insert.types";
import { AsyncLocalStorage } from "node:async_hooks";

interface AsyncStore {
toInsert: PartitionedFeedArticleFieldInsert[];
}

const asyncLocalStorage = new AsyncLocalStorage<AsyncStore>();

@Injectable()
export class PartitionedFeedArticleFieldStoreService {
connection: Connection;
TABLE_NAME = "feed_article_field_partitioned";
toInsert: PartitionedFeedArticleFieldInsert[] = [];

constructor(private readonly orm: MikroORM) {
this.connection = this.orm.em.getConnection();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async startContext<T>(cb: () => Promise<T>) {
return asyncLocalStorage.run(
{
toInsert: [],
},
cb
);
}

async markForPersistence(inserts: PartitionedFeedArticleFieldInsert[]) {
if (inserts.length === 0) {
return;
}

this.toInsert.push(...inserts);
const store = asyncLocalStorage.getStore();

if (!store) {
throw new Error(
"No context was started for PartitionedFeedArticleFieldStoreService"
);
}

store.toInsert.push(...inserts);
}

async flush(em: EntityManager<IDatabaseDriver<Connection>>) {
if (this.toInsert.length === 0) {
return;
const store = asyncLocalStorage.getStore();

if (!store) {
throw new Error(
"No context was started for PartitionedFeedArticleFieldStoreService"
);
}

const inserts = this.toInsert;
const { toInsert: inserts } = store;

if (inserts.length === 0) {
return;
}

const connection = em.getConnection();

Expand All @@ -59,7 +91,7 @@ export class PartitionedFeedArticleFieldStoreService {
});
throw err;
} finally {
this.toInsert = [];
store.toInsert = [];
}
}

Expand All @@ -83,7 +115,7 @@ export class PartitionedFeedArticleFieldStoreService {
> {
const oneMonthAgo = dayjs().subtract(1, "month").toISOString();

if (ids.length < 60) {
if (ids.length < 70) {
return this.connection.execute(
`SELECT field_hashed_value` +
` FROM ${this.TABLE_NAME}` +
Expand All @@ -95,19 +127,24 @@ export class PartitionedFeedArticleFieldStoreService {
[oneMonthAgo, feedId, ...ids]
);
} else {
const parenthesized = ids.map(() => `(?)`).join(",");

const result = await this.connection.execute(
`SELECT field_hashed_value` +
` FROM ${this.TABLE_NAME}` +
` INNER JOIN (VALUES ${parenthesized}) vals(v) ON (
field_hashed_value = v)` +
` WHERE ${
olderThanOneMonth ? `created_at <= ?` : `created_at > ?`
} AND feed_id = ? AND field_name = 'id'` +
``,
[...ids, oneMonthAgo, feedId]
);
const temporaryTableName = `current_article_ids_${feedId}`;
const sql =
`CREATE TEMP TABLE ${temporaryTableName} AS` +
` SELECT * FROM (VALUES ${ids.map(() => "(?)").join(", ")}) AS t(id)` +
` SELECT field_hashed_value` +
` FROM ${this.TABLE_NAME}` +
` INNER JOIN ${temporaryTableName} t ON (field_hashed_value = t.id)` +
` WHERE ${
olderThanOneMonth ? `created_at <= ?` : `created_at > ?`
} AND feed_id = ? AND field_name = 'id'`;

const result = await this.connection.execute(sql, [
...ids,
oneMonthAgo,
feedId,
]);

await this.connection.execute(`DROP TABLE ${temporaryTableName}`);

return result;
}
Expand Down
24 changes: 19 additions & 5 deletions services/user-feeds/src/delivery/delivery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,31 @@ export class DeliveryService {

const cacheKey = `delivery:${event.data.feed.id}:${medium.id}:${article.flattened.idHash}`;

const oldVal = await this.cacheStorageService.set({
const priorDeliveryDate = await this.cacheStorageService.set({
key: cacheKey,
getOldValue: true,
expSeconds: 60 * 60,
body: "1",
expSeconds: 60 * 60 * 24,
body: new Date().toISOString(),
});

if (oldVal) {
if (priorDeliveryDate) {
const secondsElapsed =
priorDeliveryDate !== "1"
? Math.round(
(new Date().getTime() - new Date(priorDeliveryDate).getTime()) /
1000
)
: undefined;

logger.warn(
`Article already delivered to feed ${event.data.feed.id}, medium ${medium.id},` +
` article ${formattedArticle.flattened.id}`
` article ${formattedArticle.flattened.id}`,
{
articleIdHash: formattedArticle.flattened.idHash,
secondsElapsed,
feedId: event.data.feed.id,
mediumId: medium.id,
}
);

return [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ export class FeedEventHandlerService {
logger.info(
`User feed event for feed ${event.data.feed.id} is already being processed, ignoring`
);

return;
}

// Require to be separated to use with MikroORM's decorator @UseRequestContext()
await this.handleV2EventWithDb(event);
await this.partitionedFeedArticleStoreService.startContext(
async () => await this.handleV2EventWithDb(event)
);
} catch (err) {
logger.error(`Failed to handle feed event`, {
feedId: event.data.feed.id,
Expand Down Expand Up @@ -263,12 +267,6 @@ export class FeedEventHandlerService {
},
} = event;

this.debugLog(
`Debug ${event.data.feed.id}: Fetching feed XML from ${url}`,
{},
event.debug
);

let lastHashSaved: string | null = null;

if (
Expand All @@ -283,6 +281,12 @@ export class FeedEventHandlerService {
ReturnType<typeof FeedFetcherService.prototype.fetch>
> | null = null;

this.debugLog(
`Debug ${event.data.feed.id}: Fetching feed XML from ${url}`,
{},
event.debug
);

try {
response = await this.feedFetcherService.fetch(
event.data.feed.requestLookupDetails?.url || url,
Expand All @@ -307,9 +311,9 @@ export class FeedEventHandlerService {
);

response = null;
} else {
throw err;
}

return;
}

if (!response || !response.body) {
Expand Down Expand Up @@ -454,7 +458,9 @@ export class FeedEventHandlerService {
hash: response.bodyHash,
});

this.logEventFinish(event);
this.logEventFinish(event, {
numberOfArticles: allArticles.length,
});

return deliveryStates;
} catch (err) {
Expand Down Expand Up @@ -565,6 +571,7 @@ export class FeedEventHandlerService {
event: FeedV2Event,
meta?: {
error?: Error;
numberOfArticles?: number;
}
) {
if (event.timestamp) {
Expand All @@ -580,6 +587,7 @@ export class FeedEventHandlerService {
feedId: event.data.feed.id,
feedURL: event.data.feed.url,
error: meta?.error,
numberOfArticles: meta?.numberOfArticles,
}
);
}
Expand Down
Loading

0 comments on commit 05e70b6

Please sign in to comment.