Skip to content

Commit

Permalink
Merge pull request #91 from lyve-app/feat/resume-consumers
Browse files Browse the repository at this point in the history
feat: added functionality to resume consumers on the server side
  • Loading branch information
Louis3797 authored Jun 3, 2024
2 parents 36201d9 + 925e378 commit 03bc181
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 3 deletions.
31 changes: 29 additions & 2 deletions apps/api/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ io.use(async (socket, next) => {
});
},
"send-track-send-res": ({ id, error }, sid) => {
console.log(error);
io.to(sid).emit("send-track-send-res", {
id: id ?? "",
error: error ?? ""
Expand All @@ -217,10 +216,14 @@ io.use(async (socket, next) => {
});
},
"connect-transport-send-res": ({ error }, sid) => {
console.log(error);
io.to(sid).emit("connect-transport-send-res", {
error: error ?? ""
});
},
"resume-consumers-done": ({ error }, sid) => {
io.to(sid).emit("resume-consumers-done", {
error: error ?? ""
});
}
});
})();
Expand Down Expand Up @@ -449,6 +452,30 @@ io.on("connection", (socket) => {
}
});

socket.on("resume-consumers", () => {
const { streamId, user } = socket.data;

if (streamId && user) {
try {
channel.sendToQueue(
config.rabbitmq.queues.media_server_queue,
Buffer.from(
JSON.stringify({
op: "resume-consumers",
data: {
streamId,
peerId: user.id
},
sid: socket.id
})
)
);
} catch (error) {
logger.error("Error connect-transport");
}
}
});

socket.on("leave_stream", async () => {
const { streamId, user } = socket.data;

Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import path from "path";
import Joi from "joi";

dotenv.config({
path: path.resolve(__dirname, "../../.env")
path: path.resolve(__dirname, "../../../.env")
});

const envSchema = Joi.object().keys({
Expand Down
8 changes: 8 additions & 0 deletions apps/api/src/types/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export interface OutgoingMessageDataMap {
"end-stream": {
streamId: string;
};
"resume-consumers": {
streamId: string;
peerId: string;
};
}

export type SendTrackDoneOperationName =
Expand Down Expand Up @@ -104,6 +108,10 @@ export type HandlerDataMap = {
"you-left-stream": {
streamId: string;
};
"resume-consumers-done": {
streamId: string;
error?: string;
};
} & {
[Key in SendTrackDoneOperationName]: {
error?: string;
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/types/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface ServerToClientEvents {
"connect-transport-send-res": (data: { error: string }) => void;
"connect-transport-recv-res": (data: { error?: string }) => void;
"you-left-stream": () => void;
"resume-consumers-done": (data: { error?: string }) => void;
user_joined: (data: { user: SocketUser }) => void;
user_leaved: (data: { user: SocketUser }) => void;
viewer_count: (data: { viewerCount: number }) => void;
Expand Down Expand Up @@ -89,6 +90,7 @@ export interface ClientToServerEvents {
appData: AppData;
}) => void;
"get-recv-tracks": (data: { rtpCapabilities: RtpCapabilities }) => void;
"resume-consumers": () => void;
join_stream: (
data: { streamId: string },
callback: SocketCallback<null>
Expand Down
57 changes: 57 additions & 0 deletions apps/media-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,63 @@ export async function main() {

delete streamRooms[streamId];
}
},
"resume-consumers": async ({ streamId, peerId }, sid, send) => {
const stream = streamRooms[streamId];
if (!stream) {
logger.error(
`resume-consumers: Stream with id: ${streamId} was not found`
);

return;
}

const { state } = stream;

if (!state) {
logger.error("resume-consumers: state is undefined");
return;
}

const peer = state[peerId];

if (!peer) {
logger.error("resume-consumers: peer not found");
return;
}

try {
for (const consumer of peer.consumers) {
await consumer.resume();
}

send({
op: "resume-consumers-done",
data: {
streamId
},
sid
});
} catch (error) {
const e = error as Error;
send({
op: "resume-consumers-done",
sid,
data: {
error: e.message,
streamId
}
});

send({
op: "media-server-error",
sid,
data: {
name: e.name,
msg: e.message
}
});
}
}
});
}
8 changes: 8 additions & 0 deletions apps/media-server/src/types/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export interface HandlerDataMap {
"end-stream": {
streamId: string;
};
"resume-consumers": {
streamId: string;
peerId: string;
};
}

export type SendTrackDoneOperationName =
Expand Down Expand Up @@ -96,6 +100,10 @@ export type OutgoingMessageDataMap = {
"you-left-stream": {
streamId: string;
};
"resume-consumers-done": {
streamId: string;
error?: string;
};
} & {
[Key in SendTrackDoneOperationName]: {
error?: string;
Expand Down

0 comments on commit 03bc181

Please sign in to comment.