From 925e378b15e37318671f88b245969e03bdcacc58 Mon Sep 17 00:00:00 2001 From: Louis Date: Mon, 3 Jun 2024 16:14:35 +0200 Subject: [PATCH] feat: added functionality to resume consumers on the server side --- apps/api/src/app.ts | 31 +++++++++++++- apps/api/src/config/config.ts | 2 +- apps/api/src/types/rabbitmq.ts | 8 ++++ apps/api/src/types/socket.ts | 2 + apps/media-server/src/app.ts | 57 +++++++++++++++++++++++++ apps/media-server/src/types/rabbitmq.ts | 8 ++++ 6 files changed, 105 insertions(+), 3 deletions(-) diff --git a/apps/api/src/app.ts b/apps/api/src/app.ts index 56fbf71..7947d8b 100644 --- a/apps/api/src/app.ts +++ b/apps/api/src/app.ts @@ -205,7 +205,6 @@ io.use(async (socket, next) => { }); }, "send-track-send-res": ({ id, error }, sid) => { - console.log(error); io.to(sid).emit("send-track-send-res", { id: id ?? "", error: error ?? "" @@ -217,10 +216,14 @@ io.use(async (socket, next) => { }); }, "connect-transport-send-res": ({ error }, sid) => { - console.log(error); io.to(sid).emit("connect-transport-send-res", { error: error ?? "" }); + }, + "resume-consumers-done": ({ error }, sid) => { + io.to(sid).emit("resume-consumers-done", { + error: error ?? "" + }); } }); })(); @@ -449,6 +452,30 @@ io.on("connection", (socket) => { } }); + socket.on("resume-consumers", () => { + const { streamId, user } = socket.data; + + if (streamId && user) { + try { + channel.sendToQueue( + config.rabbitmq.queues.media_server_queue, + Buffer.from( + JSON.stringify({ + op: "resume-consumers", + data: { + streamId, + peerId: user.id + }, + sid: socket.id + }) + ) + ); + } catch (error) { + logger.error("Error connect-transport"); + } + } + }); + socket.on("leave_stream", async () => { const { streamId, user } = socket.data; diff --git a/apps/api/src/config/config.ts b/apps/api/src/config/config.ts index d04bcea..afaa1d4 100644 --- a/apps/api/src/config/config.ts +++ b/apps/api/src/config/config.ts @@ -3,7 +3,7 @@ import path from "path"; import Joi from "joi"; dotenv.config({ - path: path.resolve(__dirname, "../../.env") + path: path.resolve(__dirname, "../../../.env") }); const envSchema = Joi.object().keys({ diff --git a/apps/api/src/types/rabbitmq.ts b/apps/api/src/types/rabbitmq.ts index ff2c9f3..f58ced6 100644 --- a/apps/api/src/types/rabbitmq.ts +++ b/apps/api/src/types/rabbitmq.ts @@ -68,6 +68,10 @@ export interface OutgoingMessageDataMap { "end-stream": { streamId: string; }; + "resume-consumers": { + streamId: string; + peerId: string; + }; } export type SendTrackDoneOperationName = @@ -104,6 +108,10 @@ export type HandlerDataMap = { "you-left-stream": { streamId: string; }; + "resume-consumers-done": { + streamId: string; + error?: string; + }; } & { [Key in SendTrackDoneOperationName]: { error?: string; diff --git a/apps/api/src/types/socket.ts b/apps/api/src/types/socket.ts index f73d021..892b3b4 100644 --- a/apps/api/src/types/socket.ts +++ b/apps/api/src/types/socket.ts @@ -50,6 +50,7 @@ export interface ServerToClientEvents { "connect-transport-send-res": (data: { error: string }) => void; "connect-transport-recv-res": (data: { error?: string }) => void; "you-left-stream": () => void; + "resume-consumers-done": (data: { error?: string }) => void; user_joined: (data: { user: SocketUser }) => void; user_leaved: (data: { user: SocketUser }) => void; viewer_count: (data: { viewerCount: number }) => void; @@ -89,6 +90,7 @@ export interface ClientToServerEvents { appData: AppData; }) => void; "get-recv-tracks": (data: { rtpCapabilities: RtpCapabilities }) => void; + "resume-consumers": () => void; join_stream: ( data: { streamId: string }, callback: SocketCallback diff --git a/apps/media-server/src/app.ts b/apps/media-server/src/app.ts index 84d9ade..d34742d 100644 --- a/apps/media-server/src/app.ts +++ b/apps/media-server/src/app.ts @@ -424,6 +424,63 @@ export async function main() { delete streamRooms[streamId]; } + }, + "resume-consumers": async ({ streamId, peerId }, sid, send) => { + const stream = streamRooms[streamId]; + if (!stream) { + logger.error( + `resume-consumers: Stream with id: ${streamId} was not found` + ); + + return; + } + + const { state } = stream; + + if (!state) { + logger.error("resume-consumers: state is undefined"); + return; + } + + const peer = state[peerId]; + + if (!peer) { + logger.error("resume-consumers: peer not found"); + return; + } + + try { + for (const consumer of peer.consumers) { + await consumer.resume(); + } + + send({ + op: "resume-consumers-done", + data: { + streamId + }, + sid + }); + } catch (error) { + const e = error as Error; + send({ + op: "resume-consumers-done", + sid, + data: { + error: e.message, + streamId + } + }); + + send({ + op: "media-server-error", + sid, + data: { + name: e.name, + msg: e.message + } + }); + } } }); } diff --git a/apps/media-server/src/types/rabbitmq.ts b/apps/media-server/src/types/rabbitmq.ts index 54e6220..aa1d93d 100644 --- a/apps/media-server/src/types/rabbitmq.ts +++ b/apps/media-server/src/types/rabbitmq.ts @@ -60,6 +60,10 @@ export interface HandlerDataMap { "end-stream": { streamId: string; }; + "resume-consumers": { + streamId: string; + peerId: string; + }; } export type SendTrackDoneOperationName = @@ -96,6 +100,10 @@ export type OutgoingMessageDataMap = { "you-left-stream": { streamId: string; }; + "resume-consumers-done": { + streamId: string; + error?: string; + }; } & { [Key in SendTrackDoneOperationName]: { error?: string;