diff --git a/agent/src/adapters/firebase.ts b/agent/src/adapters/firebase.ts index 91a28dc6a..08b9c9ac8 100644 --- a/agent/src/adapters/firebase.ts +++ b/agent/src/adapters/firebase.ts @@ -81,20 +81,27 @@ export class FirebaseAdapter { private firestore: admin.firestore.Firestore, private provider: "twitch", public debugKeepConnected: Set - ) { - } + ) {} - getMessage(key: string) { - return this.firestore.collection("messages").doc(key); + getMessage(channelId: string, key: string) { + return this.firestore + .collection("channels") + .doc(channelId) + .collection("messages") + .doc(key); } getMetadata(key: string) { return this.firestore.collection("metadata").doc(key); } - setIfNotExists(key: string, value: any) { + setIfNotExists(channelId: string, key: string, value: any) { return this.firestore.runTransaction(async (transaction) => { - const ref = this.firestore.collection("messages").doc(key); + const ref = this.firestore + .collection("channels") + .doc(channelId) + .collection("messages") + .doc(key); const doc = await transaction.get(ref); if (doc.exists) { return; diff --git a/agent/src/agents/twitch.ts b/agent/src/agents/twitch.ts index d4522a6f8..7883b9220 100644 --- a/agent/src/agents/twitch.ts +++ b/agent/src/agents/twitch.ts @@ -221,46 +221,49 @@ async function join( const badges = tags["badges"] .split(",") .map((badge) => badge.split("/") as [string, string]); - await firebase.getMessage(`twitch:${msg.id}`).set({ - channelId: `twitch:${msg.channelId}`, - channel, - type: "message", - timestamp: admin.firestore.Timestamp.fromDate(msg.date), - reply: tags["reply-parent-msg-id"] - ? { - messageId: `twitch:${tags["reply-parent-msg-id"]}`, - displayName: tags["reply-parent-display-name"], - userLogin: tags["reply-parent-user-login"], - userId: tags["reply-parent-user-id"], - message: tags["reply-parent-msg-body"], - } - : null, - author: { - userId: tags["user-id"], - displayName: tags["display-name"], - login: tags["username"], - }, - // we have to shim some tags because the frontend still needs some of these. - tags: { - "user-id": tags["user-id"], - "display-name": tags["display-name"], - username: user, - "room-id": tags["room-id"], - color: tags["color"], - "message-type": isAction ? "action" : "chat", - "badges-raw": tags["badges"], - badges: { - vip: badges.find((badge) => badge[0] === "vip") !== null, - moderator: badges.find((badge) => badge[0] === "moderator") !== null, + await firebase + .getMessage(`twitch:${msg.channelId}`, `twitch:${msg.id}`) + .set({ + channelId: `twitch:${msg.channelId}`, + channel, + type: "message", + timestamp: admin.firestore.Timestamp.fromDate(msg.date), + reply: tags["reply-parent-msg-id"] + ? { + messageId: `twitch:${tags["reply-parent-msg-id"]}`, + displayName: tags["reply-parent-display-name"], + userLogin: tags["reply-parent-user-login"], + userId: tags["reply-parent-user-id"], + message: tags["reply-parent-msg-body"], + } + : null, + author: { + userId: tags["user-id"], + displayName: tags["display-name"], + login: tags["username"], }, - "emotes-raw": tags["emotes"], - }, - message, - annotations: { - isFirstTimeChatter: tags["first-msg"] === "1", - isAction, - }, - }); + // we have to shim some tags because the frontend still needs some of these. + tags: { + "user-id": tags["user-id"], + "display-name": tags["display-name"], + username: user, + "room-id": tags["room-id"], + color: tags["color"], + "message-type": isAction ? "action" : "chat", + "badges-raw": tags["badges"], + badges: { + vip: badges.find((badge) => badge[0] === "vip") !== null, + moderator: + badges.find((badge) => badge[0] === "moderator") !== null, + }, + "emotes-raw": tags["emotes"], + }, + message, + annotations: { + isFirstTimeChatter: tags["first-msg"] === "1", + isAction, + }, + }); }); chat.onAnnouncement(async (channel, user, announcement, msg) => { @@ -283,57 +286,56 @@ async function join( const badges = tags["badges"] .split(",") .map((badge) => badge.split("/") as [string, string]); - await firebase.getMessage(`twitch:${msg.id}`).set({ - channelId: `twitch:${msg.channelId}`, - channel, - type: "message", - timestamp: admin.firestore.Timestamp.fromDate(msg.date), - reply: tags["reply-parent-msg-id"] - ? { - messageId: `twitch:${tags["reply-parent-msg-id"]}`, - displayName: tags["reply-parent-display-name"], - userLogin: tags["reply-parent-user-login"], - userId: tags["reply-parent-user-id"], - message: tags["reply-parent-msg-body"], - } - : null, - author: { - userId: tags["user-id"], - displayName: tags["display-name"], - login: tags["username"], - }, - // we have to shim some tags because the frontend still needs some of these. - tags: { - "user-id": tags["user-id"], - "display-name": tags["display-name"], - username: user, - "room-id": tags["room-id"], - color: tags["color"], - "message-type": "chat", - "badges-raw": tags["badges"], - badges: { - vip: badges.find((badge) => badge[0] === "vip") !== null, - moderator: badges.find((badge) => badge[0] === "moderator") !== null, + await firebase + .getMessage(`twitch:${msg.channelId}`, `twitch:${msg.id}`) + .set({ + channelId: `twitch:${msg.channelId}`, + channel, + type: "message", + timestamp: admin.firestore.Timestamp.fromDate(msg.date), + reply: tags["reply-parent-msg-id"] + ? { + messageId: `twitch:${tags["reply-parent-msg-id"]}`, + displayName: tags["reply-parent-display-name"], + userLogin: tags["reply-parent-user-login"], + userId: tags["reply-parent-user-id"], + message: tags["reply-parent-msg-body"], + } + : null, + author: { + userId: tags["user-id"], + displayName: tags["display-name"], + login: tags["username"], }, - "emotes-raw": tags["emotes"], - }, - message, - annotations: { - announcement: { color: announcement.color }, - isFirstTimeChatter: tags["first-msg"] === "1", - }, - }); + // we have to shim some tags because the frontend still needs some of these. + tags: { + "user-id": tags["user-id"], + "display-name": tags["display-name"], + username: user, + "room-id": tags["room-id"], + color: tags["color"], + "message-type": "chat", + "badges-raw": tags["badges"], + badges: { + vip: badges.find((badge) => badge[0] === "vip") !== null, + moderator: + badges.find((badge) => badge[0] === "moderator") !== null, + }, + "emotes-raw": tags["emotes"], + }, + message, + annotations: { + announcement: { color: announcement.color }, + isFirstTimeChatter: tags["first-msg"] === "1", + }, + }); }); chat.onMessageRemove(async (channel, messageId, msg) => { - const original = await firebase.getMessage(`twitch:${messageId}`).get(); - if (!original.exists) { - log.error({ messageId, timestamp: msg.date }, "no message to delete"); - return; - } - await firebase.getMessage(`twitch:x-${messageId}`).set({ + const channelId = `twitch:${await getTwitchUserId(channel)}`; + await firebase.getMessage(channelId, `twitch:x-${messageId}`).set({ channel, - channelId: original.get("channelId"), + channelId, type: "messagedeleted", timestamp: admin.firestore.Timestamp.fromDate(msg.date), messageId: `twitch:${messageId}`, @@ -341,26 +343,34 @@ async function join( }); chat.onChatClear(async (channel, msg) => { - await firebase.getMessage(`twitch:clear-${msg.date.toISOString()}`).set({ - channel, - channelId: `twitch:${msg.channelId}`, - timestamp: admin.firestore.Timestamp.fromDate(msg.date), - type: "clear", - }); + await firebase + .getMessage( + `twitch:${msg.channelId}`, + `twitch:clear-${msg.date.toISOString()}` + ) + .set({ + channel, + channelId: `twitch:${msg.channelId}`, + timestamp: admin.firestore.Timestamp.fromDate(msg.date), + type: "clear", + }); }); chat.onHosted(async (channel, hosterChannel, auto, viewers) => { // host messages don't have an associated timestamp so the best we can do is use the current date stamp. const timestamp = new Date(); - await firebase.getMessage(`twitch:host-${timestamp.toISOString()}`).set({ - channel: `#${channel}`, - channelId: `twitch:${await getTwitchUserId(channel)}`, - type: "host", - displayName: hosterChannel, - hosterChannelId: `twitch:${await getTwitchUserId(hosterChannel)}`, - timestamp: admin.firestore.FieldValue.serverTimestamp(), - viewers: viewers || 0, // includes the original I guess. - }); + const channelId = `twitch:${await getTwitchUserId(channel)}`; + await firebase + .getMessage(channelId, `twitch:host-${timestamp.toISOString()}`) + .set({ + channel: `#${channel}`, + channelId, + type: "host", + displayName: hosterChannel, + hosterChannelId: `twitch:${await getTwitchUserId(hosterChannel)}`, + timestamp: admin.firestore.FieldValue.serverTimestamp(), + viewers: viewers || 0, // includes the original I guess. + }); }); chat.onR9k(async (channel, enabled) => { @@ -464,6 +474,7 @@ async function join( raidListener = await pubsub.onCustomTopic("raid", async (message) => { const data = message.data as any; await firebase.setIfNotExists( + `twitch:${data["raid"]["source_id"]}`, `twitch:${data["type"]}-${data["raid"]["id"]}`, { channel, diff --git a/auxiliary/src/streamlabs.ts b/auxiliary/src/streamlabs.ts index 857047f80..d5f9d353b 100644 --- a/auxiliary/src/streamlabs.ts +++ b/auxiliary/src/streamlabs.ts @@ -148,6 +148,8 @@ function listenForSingleUserId(userId: string) { mergeMap((donation) => { return admin .firestore() + .collection("channels") + .doc(donation.channelId) .collection("messages") .doc(`streamlabs-${donation.channelId}-${donation.id}`) .set({ diff --git a/firestore.rules b/firestore.rules index 973896f08..e45188edf 100644 --- a/firestore.rules +++ b/firestore.rules @@ -26,6 +26,10 @@ service cloud.firestore { allow read: if true; allow write: if false; } + match /channels/{channelId}/messages/{document=**} { + allow read: if true; + allow write: if false; + } match /chat-status/{document=**} { allow list: if request.query.limit == 1; allow get: if true; diff --git a/functions/src/alchemy_webhook.ts b/functions/src/alchemy_webhook.ts index 7d062df44..63c3a7cb3 100644 --- a/functions/src/alchemy_webhook.ts +++ b/functions/src/alchemy_webhook.ts @@ -184,18 +184,23 @@ async function storeDonation( functions.logger.info("ChannelId obtained", { channelId: channelId }); // storing donation respoonses in realtimecash collection - await admin.firestore().collection("messages").add({ - channelId: channelId, - webhookId: notification.webhookId, - id: notification.id, - createdAt: notification.createdAt, - type: "realtimecash.donation", - notificationType: notification.type, - activity: activity, - donor: donor, - message: message, - timestamp: new Date(), - }); + await admin + .firestore() + .collection("channels") + .doc(channelId) + .collection("messages") + .add({ + channelId: channelId, + webhookId: notification.webhookId, + id: notification.id, + createdAt: notification.createdAt, + type: "realtimecash.donation", + notificationType: notification.type, + activity: activity, + donor: donor, + message: message, + timestamp: new Date(), + }); functions.logger.info("Payload is stored in messages collection"); } diff --git a/functions/src/eventsub.ts b/functions/src/eventsub.ts index 45a0b4f32..7e47a76a3 100644 --- a/functions/src/eventsub.ts +++ b/functions/src/eventsub.ts @@ -119,16 +119,18 @@ export const eventsub = functions.https.onRequest(async (req, res) => { } else if (status === "enabled") { const type = req.body?.subscription?.type as EventsubType; - const messageRef = admin - .firestore() - .collection("messages") - .doc(`twitch:${messageId}`); - const channelId = `twitch:${ req.body.event["broadcaster_user_id"] ?? req.body.event["to_broadcaster_user_id"] }`; + const messageRef = admin + .firestore() + .collection("channels") + .doc(channelId) + .collection("messages") + .doc(`twitch:${messageId}`); + await messageRef.set({ channelId, type, diff --git a/functions/src/index.ts b/functions/src/index.ts index 70d5f66be..726f9463d 100644 --- a/functions/src/index.ts +++ b/functions/src/index.ts @@ -6,7 +6,7 @@ import { getUserEmotes, getEmotes } from "./emotes"; import { eventsub } from "./eventsub"; import { getAppAccessToken, TWITCH_CLIENT_ID } from "./oauth"; import { search } from "./search"; -import { cleanup, subscribe, unsubscribe } from "./subscriptions"; +import { migrate, subscribe, unsubscribe } from "./subscriptions"; import { synthesize, getVoices } from "./tts"; import { getTwitchLogin, getChannelId } from "./twitch"; import { updateChatStatus } from "./chat-status"; @@ -472,7 +472,7 @@ export { search, getUserEmotes, getEmotes, - cleanup, + migrate, synthesize, getVoices, updateChatStatus, diff --git a/functions/src/subscriptions.ts b/functions/src/subscriptions.ts index 4fff69fe7..c40e96045 100644 --- a/functions/src/subscriptions.ts +++ b/functions/src/subscriptions.ts @@ -120,29 +120,15 @@ export const unsubscribe = functions.pubsub } }); -export const cleanup = functions.pubsub - .schedule("*/6 * * * *") // every ten minutes - .onRun(async (context) => { - // delete up to 19200 records a day. - const batch = admin.firestore().batch(); - const snapshot = await admin +export const migrate = functions.firestore + .document("channels/{channelId}/messages/{messageId}") + .onWrite((change, context) => { + return admin .firestore() .collection("messages") - .where("timestamp", "<", new Date(Date.now() - 7 * 86400 * 1000)) - .orderBy("timestamp", "asc") - .limit(80) - .get(); - snapshot.forEach((doc) => batch.delete(doc.ref)); - await batch.commit(); - functions.logger.info("deleted", snapshot.size, "messages"); - - const claimRef = admin.database().ref("agents").child("twitch"); - - const unclaimed = await claimRef.orderByValue().equalTo("").get(); - // log an error for any unclaimed agents. we don't want to delete them - // because this might be a race condition/false positive but logging an - // error will get reported. - for (const channel of Object.keys(unclaimed.val() || {})) { - functions.logger.error("unclaimed channel detected", channel); - } + .doc(context.params.messageId) + .set({ + channelId: context.params.channelId, + ...change.after.data(), + }); }); diff --git a/lib/models/adapters/messages.dart b/lib/models/adapters/messages.dart index a912177a3..fe5898998 100644 --- a/lib/models/adapters/messages.dart +++ b/lib/models/adapters/messages.dart @@ -325,8 +325,9 @@ class MessagesAdapter { final emotes = await getEmotes(channel); final results = await db + .collection("channels") + .doc(channel.toString()) .collection("messages") - .where("channelId", isEqualTo: channel.toString()) .where("timestamp", isLessThan: from) .orderBy("timestamp") .limitToLast(250) @@ -345,8 +346,9 @@ class MessagesAdapter { var lastAdMessageCount = 0; var isInitialSnapshot = true; return db + .collection("channels") + .doc(channel.toString()) .collection("messages") - .where("channelId", isEqualTo: channel.toString()) .orderBy("timestamp") .limitToLast(250) .snapshots() @@ -391,8 +393,9 @@ class MessagesAdapter { /// null indicates that the stream is offline. Stream forChannelUptime(Channel channel) { return db + .collection("channels") + .doc(channel.toString()) .collection("messages") - .where("channelId", isEqualTo: channel.toString()) .where("type", whereIn: ["stream.online", "stream.offline"]) .orderBy("timestamp") .limitToLast(1) @@ -412,6 +415,8 @@ class MessagesAdapter { Future hasMessages(Channel channel) async { return await db + .collection("channels") + .doc(channel.toString()) .collection("messages") .where("channelId", isEqualTo: channel.toString()) .limit(1) diff --git a/lib/models/adapters/metadata.dart b/lib/models/adapters/metadata.dart new file mode 100644 index 000000000..875609d16 --- /dev/null +++ b/lib/models/adapters/metadata.dart @@ -0,0 +1,39 @@ +import 'package:cloud_firestore/cloud_firestore.dart'; + +class MetadataAdapter { + final FirebaseFirestore db; + + MetadataAdapter._({required this.db}); + + static MetadataAdapter get instance => + _instance ??= MetadataAdapter._(db: FirebaseFirestore.instance); + static MetadataAdapter? _instance; + + Stream getThirdPartyMetadataValue( + {required String channelId, required String name, required String key}) { + return db + .collection("metadata") + .doc(channelId) + .collection("third-party") + .where("name", isEqualTo: name) + .where("key", isEqualTo: key) + .orderBy("createdAt", descending: true) + .limit(1) + .snapshots() + .map((event) { + if (event.docs.isEmpty) { + return null; + } + return event.docs.first.get("value"); + }); + } + + Stream> getAvailableThirdPartyProviders( + {required String channelId}) { + return db + .collection("metadata") + .doc(channelId) + .snapshots() + .map((doc) => (doc.get("thirdParty") ?? {}).keys.toList()); + } +} diff --git a/lib/models/adapters/profiles.dart b/lib/models/adapters/profiles.dart index 71bb29095..c9deddf69 100644 --- a/lib/models/adapters/profiles.dart +++ b/lib/models/adapters/profiles.dart @@ -27,8 +27,9 @@ class ProfilesAdapter { Stream getIsOnline({required String channelId}) { return db + .collection("channels") + .doc(channelId) .collection("messages") - .where("channelId", isEqualTo: channelId) .where("type", whereIn: ["stream.online", "stream.offline"]) .orderBy("timestamp") .limitToLast(1)