Skip to content

Commit

Permalink
db: cache objects with ttl (#1997)
Browse files Browse the repository at this point in the history
* db: cache objects with ttl

* api: Make sure cache objects are not shared

* api/cache: Allow custom ttl for cache

Also remove maxKeys which is bad (it throws an
error, no LRU etc).

* api/playback: Add cache to playback

* api: Make cache gets built-in on db

* api/playback: Cache object store for assets

* api/playback: Fetch streams before assets

We got huge traffic now, lets just switch
their order. Later we can think of optimizing
this better idk

* api: Fix GetOptions default values

useReplica was defaulting to false when the cache: true
option was set.

* api: Fix tests

* api: make the cache into a class

* api: Make sure to cache objectstore queries

* api/db: Improve caching logic

- Simplify getOrSet on read
- Always write to cache even if not reading from cache
- Update cache on writes (helps tests more than prod)
- Make copies of objects when reading and writing to cache

* api: Remove unnecessary cache flushes on tests

* api/acl: Make sure all async flows have cache

* api: Remove unused import

* api/cache: Make cache TTL configurable

---------

Co-authored-by: Victor Elias <victorgelias@gmail.com>
  • Loading branch information
gioelecerati and victorges authored Dec 21, 2023
1 parent 033cc2c commit 70bdda2
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 57 deletions.
5 changes: 5 additions & 0 deletions packages/api/src/app-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { setupTus, setupTestTus } from "./controllers/asset";
import * as fcl from "@onflow/fcl";
import createFrontend from "@livepeer.studio/www";
import { NotFoundError } from "./store/errors";
import { cache } from "./store/cache";

enum OrchestratorSource {
hardcoded = "hardcoded",
Expand Down Expand Up @@ -57,6 +58,7 @@ export default async function makeApp(params: CliArgs) {
httpPrefix,
postgresUrl,
postgresReplicaUrl,
defaultCacheTtl,
frontendDomain,
supportAddr,
sendgridTemplateId,
Expand Down Expand Up @@ -98,6 +100,9 @@ export default async function makeApp(params: CliArgs) {
postgresReplicaUrl,
appName: ownRegion ? `${ownRegion}-api` : "api",
});
if (defaultCacheTtl > 0) {
cache.init({ stdTTL: defaultCacheTtl });
}

// RabbitMQ
const queue: Queue = amqpUrl
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/controllers/access-control.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ describe("controllers/signing-key", () => {
});
expect(res.status).toBe(204);
await db.user.update(gatedAsset.userId, { suspended: true });

const res2 = await client.post("/access-control/gate", {
stream: `video+${gatedAsset.playbackId}`,
type: "jwt",
Expand Down
41 changes: 27 additions & 14 deletions packages/api/src/controllers/access-control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import { fetchWithTimeoutAndRedirects } from "../util";
import fetch from "node-fetch";
import { WithID } from "../store/types";
import { DBStream } from "../store/stream-table";
import { getViewers } from "./usage";
import { HACKER_DISABLE_CUTOFF_DATE } from "./utils/notification";
import { isFreeTierUser } from "./helpers";
import { cache } from "../store/cache";

const WEBHOOK_TIMEOUT = 30 * 1000;
const MAX_ALLOWED_VIEWERS_FOR_FREE_TIER = 5;
Expand Down Expand Up @@ -102,9 +102,16 @@ app.post(
validatePost("access-control-gate-payload"),
async (req, res) => {
const playbackId = req.body.stream.replace(/^\w+\+/, "");
const content =
(await db.stream.getByPlaybackId(playbackId)) ||
(await db.asset.getByPlaybackId(playbackId));

let content = await cache.getOrSet(
`acl-content-${playbackId}`,
async () => {
return (
(await db.stream.getByPlaybackId(playbackId)) ||
(await db.asset.getByPlaybackId(playbackId))
);
}
);

res.set("Cache-Control", "max-age=120,stale-while-revalidate=600");

Expand All @@ -116,7 +123,7 @@ app.post(
throw new NotFoundError("Content not found");
}

const user = await db.user.get(content.userId);
let user = await db.user.get(content.userId, { useCache: true });

if (user.suspended || ("suspended" in content && content.suspended)) {
const contentLog = JSON.stringify(JSON.stringify(content));
Expand All @@ -129,7 +136,7 @@ app.post(
const playbackPolicyType = content.playbackPolicy?.type ?? "public";

if (user.createdAt < HACKER_DISABLE_CUTOFF_DATE) {
let limitReached = await freeTierLimitReached(content, user, req);
let limitReached = freeTierLimitReached(content, user, req);
if (limitReached) {
throw new ForbiddenError("Free tier user reached viewership limit");
}
Expand Down Expand Up @@ -159,11 +166,15 @@ app.post(
);
}

const query = [];
query.push(sql`signing_key.data->>'publicKey' = ${req.body.pub}`);
const [signingKeyOutput] = await db.signingKey.find(query, {
limit: 2,
});
const [signingKeyOutput] = await cache.getOrSet(
`acl-signing-key-pub-${req.body.pub}`,
() => {
const query = [
sql`signing_key.data->>'publicKey' = ${req.body.pub}`,
];
return db.signingKey.find(query, { limit: 2 });
}
);

if (signingKeyOutput.length == 0) {
console.log(`
Expand Down Expand Up @@ -210,7 +221,9 @@ app.post(
"Content is gated and requires an access key"
);
}
const webhook = await db.webhook.get(content.playbackPolicy.webhookId);
const webhook = await db.webhook.get(content.playbackPolicy.webhookId, {
useCache: true,
});
if (!webhook) {
console.log(`
access-control: gate: content with playbackId=${playbackId} is gated but corresponding webhook not found for webhookId=${content.playbackPolicy.webhookId}, disallowing playback
Expand Down Expand Up @@ -280,11 +293,11 @@ app.get("/public-key", async (req, res) => {
});
});

async function freeTierLimitReached(
function freeTierLimitReached(
content: DBStream | WithID<Asset>,
user: User,
req: Request
): Promise<boolean> {
): boolean {
if (!isFreeTierUser(user)) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/controllers/asset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async function validateAssetPlaybackPolicy(
}

async function getActiveObjectStore(id: string) {
const os = await db.objectStore.get(id);
const os = await db.objectStore.get(id, { useCache: true });
if (!os || os.deleted || os.disabled) {
throw new Error("Object store not found or disabled");
}
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/controllers/playback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { WithID } from "../store/types";
import { db } from "../store";
import { DBStream } from "../store/stream-table";
import { DBSession } from "../store/session-table";
import { cache } from "../store/cache";

const EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE = Date.parse(
"2023-06-06T00:00:00.000Z"
Expand Down Expand Up @@ -525,6 +526,7 @@ describe("controllers/playback", () => {
await db.asset.update(asset2.id, {
createdAt: EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE - 1000,
});
cache.flush();
});

it("should return playback URL asset of user from CID", async () => {
Expand Down
73 changes: 43 additions & 30 deletions packages/api/src/controllers/playback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { NotFoundError, UnprocessableEntityError } from "../store/errors";
import { isExperimentSubject } from "../store/experiment-table";
import logger from "../logger";
import { getRunningRecording } from "./session";
import { cache } from "../store/cache";

/**
* CROSS_USER_ASSETS_CUTOFF_DATE represents the cut-off date for cross-account
Expand Down Expand Up @@ -115,7 +116,7 @@ const getAssetPlaybackInfo = async (
ingest: string,
asset: WithID<Asset>
) => {
const os = await db.objectStore.get(asset.objectStoreId);
const os = await db.objectStore.get(asset.objectStoreId, { useCache: true });
if (!os || os.deleted || os.disabled) {
return null;
}
Expand All @@ -136,32 +137,17 @@ const getAssetPlaybackInfo = async (
);
};

type PlaybackResource = {
stream?: DBStream;
session?: DBSession;
asset?: WithID<Asset>;
};

export async function getResourceByPlaybackId(
id: string,
user?: User,
cutoffDate?: number,
origin?: string
): Promise<{ stream?: DBStream; session?: DBSession; asset?: WithID<Asset> }> {
let asset =
(await db.asset.getByPlaybackId(id)) ??
(await db.asset.getByIpfsCid(id, user, cutoffDate)) ??
(await db.asset.getBySourceURL("ipfs://" + id, user, cutoffDate)) ??
(await db.asset.getBySourceURL("ar://" + id, user, cutoffDate));

if (asset && !asset.deleted) {
if (asset.status.phase !== "ready" && !asset.sourcePlaybackReady) {
throw new UnprocessableEntityError("asset is not ready for playback");
}
if (asset.userId !== user?.id && cutoffDate) {
console.log(
`Returning cross-user asset for playback. ` +
`userId=${user?.id} userEmail=${user?.email} origin=${origin} ` +
`assetId=${asset.id} assetUserId=${asset.userId} playbackId=${asset.playbackId}`
);
}
return { asset };
}

cutoffDate?: number
): Promise<PlaybackResource> {
let stream = await db.stream.getByPlaybackId(id);
if (!stream) {
const streamById = await db.stream.get(id);
Expand All @@ -174,13 +160,24 @@ export async function getResourceByPlaybackId(
return { stream };
}

const asset =
(await db.asset.getByPlaybackId(id)) ??
(await db.asset.getByIpfsCid(id, user, cutoffDate)) ??
(await db.asset.getBySourceURL("ipfs://" + id, user, cutoffDate)) ??
(await db.asset.getBySourceURL("ar://" + id, user, cutoffDate));

if (asset && !asset.deleted) {
return { asset };
}

const session = await db.session.get(id);
if (session && !session.deleted) {
return { session };
}

return {};
}

async function getAttestationPlaybackInfo(
config: CliArgs,
ingest: string,
Expand Down Expand Up @@ -224,14 +221,30 @@ async function getPlaybackInfo(
withRecordings?: boolean
): Promise<PlaybackInfo> {
const cutoffDate = isCrossUserQuery ? null : CROSS_USER_ASSETS_CUTOFF_DATE;
let { stream, asset, session } = await getResourceByPlaybackId(
id,
req.user,
cutoffDate,
origin
);
const cacheKey = `playbackInfo-${id}-user-${req.user?.id}-cutoff-${cutoffDate}`;
let resource = cache.get<PlaybackResource>(cacheKey);
if (!resource) {
resource = await getResourceByPlaybackId(id, req.user, cutoffDate);

const ttl =
resource.asset && resource.asset.status.phase !== "ready" ? 5 : 120;
cache.set(cacheKey, resource, ttl);
}

let { stream, asset, session } = resource;

if (asset) {
if (asset.status.phase !== "ready" && !asset.sourcePlaybackReady) {
throw new UnprocessableEntityError("asset is not ready for playback");
}
if (asset.userId !== req.user?.id && cutoffDate) {
console.log(
`Returning cross-user asset for playback. ` +
`userId=${req.user?.id} userEmail=${req.user?.email} origin=${origin} ` +
`assetId=${asset.id} assetUserId=${asset.userId} playbackId=${asset.playbackId}`
);
}

return await getAssetPlaybackInfo(req.config, ingest, asset);
}

Expand Down
8 changes: 6 additions & 2 deletions packages/api/src/controllers/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ export async function buildRecordingUrl(
recordCatalystObjectStoreId: string,
secondaryRecordObjectStoreId: string
) {
const os = await db.objectStore.get(recordCatalystObjectStoreId);
const os = await db.objectStore.get(recordCatalystObjectStoreId, {
useCache: true,
});

let urlPrefix = pathJoin(os.publicUrl, session.playbackId, session.id);
let manifestUrl = pathJoin(urlPrefix, "output.m3u8");
Expand All @@ -265,7 +267,9 @@ export async function buildRecordingUrl(
};
}

const secondaryOs = await db.objectStore.get(secondaryRecordObjectStoreId);
const secondaryOs = await db.objectStore.get(secondaryRecordObjectStoreId, {
useCache: true,
});
urlPrefix = pathJoin(secondaryOs.publicUrl, session.playbackId, session.id);
manifestUrl = pathJoin(urlPrefix, "output.m3u8");

Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ export async function getRecordingPlaybackUrl(
return null;
}

const os = await db.objectStore.get(objectStoreId);
const os = await db.objectStore.get(objectStoreId, { useCache: true });
url = pathJoin(os.publicUrl, session.playbackId, session.id, "output.m3u8");
} catch (e) {
console.log(`
Expand Down
8 changes: 5 additions & 3 deletions packages/api/src/middleware/auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ describe("auth middleware", () => {
let nonAdminApiKey: string;
let client: TestClient;

const setAccess = (token: string, rules?: ApiToken["access"]["rules"]) =>
db.apiToken.update(token, { access: { rules } });
const setAccess = async (
token: string,
rules?: ApiToken["access"]["rules"]
) => db.apiToken.update(token, { access: { rules } });

const fetchStatus = async (method: string, path: string) => {
const res = await client.fetch(path, { method });
Expand Down Expand Up @@ -375,7 +377,7 @@ describe("auth middleware", () => {
} = await setupUsers(server, mockAdminUserInput, mockNonAdminUserInput));
});

const setAccess = (token: string, access?: ApiToken["access"]) =>
const setAccess = async (token: string, access?: ApiToken["access"]) =>
db.apiToken.update(token, { access });

const expectResponse = (res: Response) =>
Expand Down
9 changes: 6 additions & 3 deletions packages/api/src/middleware/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function authenticator(): RequestHandler {
if (!tokenId) {
throw new UnauthorizedError(`no authorization token provided`);
}
tokenObject = await db.apiToken.get(tokenId);
tokenObject = await db.apiToken.get(tokenId, { useCache: true });
const matchesBasicUser = tokenObject?.userId === basicUser?.name;
if (!tokenObject || (isBasic && !matchesBasicUser)) {
throw new UnauthorizedError(`no token ${tokenId} found`);
Expand All @@ -108,7 +108,8 @@ function authenticator(): RequestHandler {
);
}

user = await db.user.get(userId);
user = await db.user.get(userId, { useCache: true });

if (!user) {
throw new UnauthorizedError(
`no user found from authorization header: ${authHeader}`
Expand All @@ -118,10 +119,12 @@ function authenticator(): RequestHandler {
throw new ForbiddenError(`user is suspended`);
}

req.token = tokenObject;
req.user = user;

// UI admins must have a JWT
req.isUIAdmin = user.admin && authScheme === "jwt";
req.token = tokenObject;

return next();
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/api/src/parse-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ export default function parseCli(argv?: string | readonly string[]) {
describe: "url of a postgres read replica database",
type: "string",
},
"default-cache-ttl": {
describe: "default TTL for entries cached in memory, in seconds",
type: "number",
default: 120,
},
"amqp-url": {
describe: "the RabbitMQ Url",
type: "string",
Expand Down
Loading

0 comments on commit 70bdda2

Please sign in to comment.