From 0b51174b6564eff2003fe2555f2c2aa03f3cc7a4 Mon Sep 17 00:00:00 2001 From: Pablo Romeo Date: Wed, 13 Sep 2023 10:22:29 -0300 Subject: [PATCH] Experimental aproach to customizable extra dependencies and custom ENV overrides for transcoder --- docker-compose.yaml | 177 ++++----- worker/app/worker.js | 350 +++++++++--------- .../etc/cont-init.d/92-install-dependencies | 6 + worker/extended-image/Dockerfile-development | 4 + 4 files changed, 283 insertions(+), 254 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 01af9aa..678d54e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,91 +1,96 @@ -version: '3.8' +version: "3.8" services: - plex: - container_name: plex - build: - context: ./pms - dockerfile: ./extended-image/Dockerfile-development - environment: - VERSION: docker - PUID: ${PUID} - PGID: ${PGID} - TZ: ${TZ} - ORCHESTRATOR_URL: http://plex-orchestrator:3500 - PMS_SERVICE: plex # This service. If you disable Local Relay then you must use PMS_IP instead - PMS_PORT: "32400" - TRANSCODE_OPERATING_MODE: both #(local|remote|both) - TRANSCODER_VERBOSE: "1" # 1=verbose, 0=silent - LOCAL_RELAY_ENABLED: "1" - LOCAL_RELAY_PORT: "32499" - healthcheck: - test: curl -fsS http://localhost:32400/identity > /dev/null || exit 1 - interval: 15s - timeout: 15s - retries: 5 - start_period: 30s - volumes: - - plex-config:/config - - transcode-volume:/transcode - - ./sample-content/tv:/data/tv - - ./sample-content/movies:/data/movies - ports: - - 32499:32499 # LOCAL_RELAY_PORT - - 32400:32400 - - 3005:3005 - - 8324:8324 - - 1900:1900/udp - - 32410:32410/udp - - 32412:32412/udp - - 32413:32413/udp - - 32414:32414/udp + plex: + container_name: plex + build: + context: ./pms + dockerfile: ./extended-image/Dockerfile-development + environment: + VERSION: docker + PUID: ${PUID} + PGID: ${PGID} + TZ: ${TZ} + ORCHESTRATOR_URL: http://plex-orchestrator:3500 + PMS_SERVICE: plex # This service. If you disable Local Relay then you must use PMS_IP instead + PMS_PORT: "32400" + TRANSCODE_OPERATING_MODE: both #(local|remote|both) + TRANSCODER_VERBOSE: "1" # 1=verbose, 0=silent + LOCAL_RELAY_ENABLED: "1" + LOCAL_RELAY_PORT: "32499" + healthcheck: + test: curl -fsS http://localhost:32400/identity > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 30s + volumes: + - plex-config:/config + - transcode-volume:/transcode + - ./sample-content/tv:/data/tv + - ./sample-content/movies:/data/movies + ports: + - 32499:32499 # LOCAL_RELAY_PORT + - 32400:32400 + - 3005:3005 + - 8324:8324 + - 1900:1900/udp + - 32410:32410/udp + - 32412:32412/udp + - 32413:32413/udp + - 32414:32414/udp - plex-orchestrator: - container_name: plex-orchestrator - build: ./orchestrator - healthcheck: - test: curl -fsS http://localhost:3500/health > /dev/null || exit 1 - interval: 15s - timeout: 15s - retries: 5 - start_period: 30s - environment: - TZ: ${TZ} - LISTENING_PORT: 3500 - WORKER_SELECTION_STRATEGY: "LOAD_RANK" # RR | LOAD_CPU | LOAD_TASKS | LOAD_RANK (default) - volumes: - - /etc/localtime:/etc/localtime:ro - ports: - - 3500:3500 + plex-orchestrator: + container_name: plex-orchestrator + build: ./orchestrator + healthcheck: + test: curl -fsS http://localhost:3500/health > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 30s + environment: + TZ: ${TZ} + LISTENING_PORT: 3500 + WORKER_SELECTION_STRATEGY: "LOAD_RANK" # RR | LOAD_CPU | LOAD_TASKS | LOAD_RANK (default) + volumes: + - /etc/localtime:/etc/localtime:ro + ports: + - 3500:3500 - plex-worker: - build: - context: ./worker - dockerfile: ./extended-image/Dockerfile-development - deploy: - mode: replicated - replicas: 1 - environment: - VERSION: docker - PUID: 1000 - PGID: 1000 - TZ: ${TZ} - LISTENING_PORT: 3501 # used by the healthcheck - STAT_CPU_INTERVAL: 2000 # interval for reporting worker load metrics - ORCHESTRATOR_URL: http://plex-orchestrator:3500 - EAE_SUPPORT: "1" - healthcheck: - test: curl -fsS http://localhost:3501/health > /dev/null || exit 1 - interval: 15s - timeout: 15s - retries: 5 - start_period: 240s - volumes: - - codecs:/codecs - - ./sample-content/tv:/data/tv - - ./sample-content/movies:/data/movies - - transcode-volume:/transcode + plex-worker: + build: + context: ./worker + dockerfile: ./extended-image/Dockerfile-development + args: + EXTRA_APT_INSTALLS: ${EXTRA_APT_INSTALLS} + cap_add: + - CAP_PERFMON + deploy: + mode: replicated + replicas: 1 + environment: + VERSION: docker + PUID: 1000 + PGID: 1000 + TZ: ${TZ} + LISTENING_PORT: 3501 # used by the healthcheck + STAT_CPU_INTERVAL: 2000 # interval for reporting worker load metrics + ORCHESTRATOR_URL: http://plex-orchestrator:3500 + EAE_SUPPORT: "1" + TRANSCODER_ENV_OVERRIDES: "LIBVA_DRIVERS_PATH=/usr/lib/x86_64-linux-gnu/dri" + healthcheck: + test: curl -fsS http://localhost:3501/health > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 240s + volumes: + - codecs:/codecs + - ./sample-content/tv:/data/tv + - ./sample-content/movies:/data/movies + - transcode-volume:/transcode volumes: - plex-config: - transcode-volume: - codecs: + plex-config: + transcode-volume: + codecs: diff --git a/worker/app/worker.js b/worker/app/worker.js index 83a820c..3bd40f9 100644 --- a/worker/app/worker.js +++ b/worker/app/worker.js @@ -2,9 +2,9 @@ const LISTENING_PORT = process.env.LISTENING_PORT || 3501; const STAT_CPU_INTERVAL = process.env.STAT_CPU_INTERVAL || 2000; const STAT_CPU_OPS_DURATION = process.env.STAT_CPU_OPS_DURATION || 1000; const ORCHESTRATOR_URL = - process.env.ORCHESTRATOR_URL || "http://localhost:3500"; + process.env.ORCHESTRATOR_URL || "http://localhost:3500"; const TRANSCODER_PATH = - process.env.TRANSCODER_PATH || "/usr/lib/plexmediaserver/"; + process.env.TRANSCODER_PATH || "/usr/lib/plexmediaserver/"; const TRANSCODER_NAME = process.env.TRANSCODER_NAME || "Plex Transcoder"; const EAE_SUPPORT = process.env.EAE_SUPPORT || "1"; const EAE_EXECUTABLE = process.env.EAE_EXECUTABLE || ""; @@ -36,30 +36,30 @@ console.log(`Computed CPU ops => ${ops}`); // healthcheck endpoint app.get("/health", (req, res) => { - res.send("Healthy"); + res.send("Healthy"); }); server.listen(LISTENING_PORT, () => { - console.log(`Worker listening on port ${LISTENING_PORT}`); + console.log(`Worker listening on port ${LISTENING_PORT}`); }); // calculate cpu usage every 2 seconds setInterval(() => { - cpuStat.usagePercent( - { sampleMs: STAT_CPU_INTERVAL }, - (err, percent, seconds) => { - if (!err) { - cpuUsage = percent.toFixed(2); - if (socket.connected) { - socket.emit("worker.stats", { - cpu: cpuUsage, - tasks: taskMap.size, - ops: ops, - }); - } - } - } - ); + cpuStat.usagePercent( + { sampleMs: STAT_CPU_INTERVAL }, + (err, percent, seconds) => { + if (!err) { + cpuUsage = percent.toFixed(2); + if (socket.connected) { + socket.emit("worker.stats", { + cpu: cpuUsage, + tasks: taskMap.size, + ops: ops, + }); + } + } + } + ); }, STAT_CPU_INTERVAL); let workerId = uuid(); @@ -68,168 +68,182 @@ let taskMap = new Map(); console.debug(`Initializing Worker ${workerId}|${process.env.HOSTNAME}`); socket.on("connect", () => { - console.log(`Worker connected on socket ${socket.id}`); - socket.emit("worker.announce", { - workerId: workerId, - host: process.env.HOSTNAME, - }); + console.log(`Worker connected on socket ${socket.id}`); + socket.emit("worker.announce", { + workerId: workerId, + host: process.env.HOSTNAME, + }); }); function processEnv(env) { - // overwrite environment settings coming from the original plex instance tied to architecture - newEnv = JSON.parse(JSON.stringify(env)); - newEnv.PLEX_ARCH = process.env.PLEX_ARCH; - newEnv.PLEX_MEDIA_SERVER_INFO_MODEL = - process.env.PLEX_MEDIA_SERVER_INFO_MODEL; - newEnv.FFMPEG_EXTERNAL_LIBS = process.env.FFMPEG_EXTERNAL_LIBS; - return newEnv; + // overwrite environment settings coming from the original plex instance tied to architecture + newEnv = JSON.parse(JSON.stringify(env)); + newEnv.PLEX_ARCH = process.env.PLEX_ARCH; + newEnv.PLEX_MEDIA_SERVER_INFO_MODEL = + process.env.PLEX_MEDIA_SERVER_INFO_MODEL; + newEnv.FFMPEG_EXTERNAL_LIBS = process.env.FFMPEG_EXTERNAL_LIBS; + + // TRANSCODER_ENV_OVERRIDES is a comma-separated list of environment variables to override + // parse it and overwrite each one in newEnv + if (process.env.TRANSCODER_ENV_OVERRIDES) { + let overrides = process.env.TRANSCODER_ENV_OVERRIDES.split(","); + overrides.forEach((override) => { + let [key, value] = override.split("="); + newEnv[key] = value; + }); + } + return newEnv; } socket.on("worker.task.request", (taskRequest) => { - console.log("Received task request"); - - socket.emit("worker.task.update", { - taskId: taskRequest.taskId, - status: "received", - }); - - var processedEnvironmentVariables = processEnv(taskRequest.payload.env); - - var child, childEAE; - if (taskRequest.payload.args[0] === "testpayload") { - console.log(`args => ${JSON.stringify(taskRequest.payload.args)}`); - console.log(`env => ${JSON.stringify(processedEnvironmentVariables)}`); - console.log("Starting test of waiting for 5 seconds"); - child = exec("sleep 5"); - } else { - if (FFMPEG_HWACCEL != false) { - console.log(`Setting hwaccel to ${FFMPEG_HWACCEL}`); - let i = taskRequest.payload.args.indexOf("-hwaccel"); - if (i > 0) { - taskRequest.payload.args[i + 1] = FFMPEG_HWACCEL; - } else { - taskRequest.payload.args.unshift("-hwaccel", FFMPEG_HWACCEL); - } - } - - console.log(`EAE_ROOT => "${processedEnvironmentVariables.EAE_ROOT}"`); - if ( - (EAE_SUPPORT == "1" || EAE_SUPPORT == "true") && - EAE_EXECUTABLE != "" && - processedEnvironmentVariables.EAE_ROOT?.length > 0 - ) { - if (!fs.existsSync(processedEnvironmentVariables.EAE_ROOT)) { - console.log( - `EAE Support - Creating EAE_ROOT destination => ${processedEnvironmentVariables.EAE_ROOT}` - ); - fs.mkdirSync(processedEnvironmentVariables.EAE_ROOT, { - recursive: true, - }); - } - - console.log( - `EAE Support - Spawning EasyAudioEncoder from "${EAE_EXECUTABLE}", cwd => ${processedEnvironmentVariables.EAE_ROOT}` - ); - childEAE = spawn(EAE_EXECUTABLE, [], { - cwd: processedEnvironmentVariables.EAE_ROOT, - env: processedEnvironmentVariables, - }); - childEAE.stdout.pipe(process.stdout); - childEAE.stderr.pipe(process.stderr); - childEAE.on("error", (err) => { - console.error("EAE Support - EAE failed:"); - console.error(err); - }); - childEAE.on("close", () => { - console.log("EAE Support - Closing"); - }); - childEAE.on("exit", () => { - console.log("EAE Support - Exiting"); - }); - } else { - childEAE = null; - } - - if (!fs.existsSync(taskRequest.payload.cwd)) { - console.error( - `CWD path doesn't seem to exist. Plex should have created this path before-hand, so you may have an issue with your shares => "${taskRequest.payload.cwd}"` - ); - } - - child = spawn(TRANSCODER_PATH + TRANSCODER_NAME, taskRequest.payload.args, { - cwd: taskRequest.payload.cwd, - env: processedEnvironmentVariables, - }); - } - - taskMap.set(taskRequest.taskId, { - transcodeProcess: child, - eaeProcess: childEAE, - }); - - child.stdout.pipe(process.stdout); - child.stderr.pipe(process.stderr); - - let notified = false; - const completionHandler = (code) => { - if (!notified) { - console.log("Completed transcode"); - socket.emit("worker.task.update", { - taskId: taskRequest.taskId, - status: "done", - result: code === 0, - exitCode: code, - }); - notified = true; - console.log("Removing process from taskMap"); - taskMap.delete(taskRequest.taskId); - } - }; - - child.on("error", (err) => { - console.error("Transcoding failed:"); - console.error(err); - notified = true; - socket.emit("worker.task.update", { - taskId: taskRequest.taskId, - status: "done", - result: false, - error: err.message, - }); - console.log("Orchestrator notified"); - - console.log("Removing process from taskMap"); - taskMap.delete(taskRequest.taskId); - }); - - child.on("close", completionHandler); - child.on("exit", completionHandler); - - socket.emit("worker.task.update", { - taskId: taskRequest.taskId, - status: "inprogress", - }); + console.log("Received task request"); + + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "received", + }); + + var processedEnvironmentVariables = processEnv(taskRequest.payload.env); + + var child, childEAE; + if (taskRequest.payload.args[0] === "testpayload") { + console.log(`args => ${JSON.stringify(taskRequest.payload.args)}`); + console.log(`env => ${JSON.stringify(processedEnvironmentVariables)}`); + console.log("Starting test of waiting for 5 seconds"); + child = exec("sleep 5"); + } else { + if (FFMPEG_HWACCEL != false) { + console.log(`Setting hwaccel to ${FFMPEG_HWACCEL}`); + let i = taskRequest.payload.args.indexOf("-hwaccel"); + if (i > 0) { + taskRequest.payload.args[i + 1] = FFMPEG_HWACCEL; + } else { + taskRequest.payload.args.unshift("-hwaccel", FFMPEG_HWACCEL); + } + } + + console.log(`EAE_ROOT => "${processedEnvironmentVariables.EAE_ROOT}"`); + if ( + (EAE_SUPPORT == "1" || EAE_SUPPORT == "true") && + EAE_EXECUTABLE != "" && + processedEnvironmentVariables.EAE_ROOT?.length > 0 + ) { + if (!fs.existsSync(processedEnvironmentVariables.EAE_ROOT)) { + console.log( + `EAE Support - Creating EAE_ROOT destination => ${processedEnvironmentVariables.EAE_ROOT}` + ); + fs.mkdirSync(processedEnvironmentVariables.EAE_ROOT, { + recursive: true, + }); + } + + console.log( + `EAE Support - Spawning EasyAudioEncoder from "${EAE_EXECUTABLE}", cwd => ${processedEnvironmentVariables.EAE_ROOT}` + ); + childEAE = spawn(EAE_EXECUTABLE, [], { + cwd: processedEnvironmentVariables.EAE_ROOT, + env: processedEnvironmentVariables, + }); + childEAE.stdout.pipe(process.stdout); + childEAE.stderr.pipe(process.stderr); + childEAE.on("error", (err) => { + console.error("EAE Support - EAE failed:"); + console.error(err); + }); + childEAE.on("close", () => { + console.log("EAE Support - Closing"); + }); + childEAE.on("exit", () => { + console.log("EAE Support - Exiting"); + }); + } else { + childEAE = null; + } + + if (!fs.existsSync(taskRequest.payload.cwd)) { + console.error( + `CWD path doesn't seem to exist. Plex should have created this path before-hand, so you may have an issue with your shares => "${taskRequest.payload.cwd}"` + ); + } + + child = spawn( + TRANSCODER_PATH + TRANSCODER_NAME, + taskRequest.payload.args, + { + cwd: taskRequest.payload.cwd, + env: processedEnvironmentVariables, + } + ); + } + + taskMap.set(taskRequest.taskId, { + transcodeProcess: child, + eaeProcess: childEAE, + }); + + child.stdout.pipe(process.stdout); + child.stderr.pipe(process.stderr); + + let notified = false; + const completionHandler = (code) => { + if (!notified) { + console.log("Completed transcode"); + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "done", + result: code === 0, + exitCode: code, + }); + notified = true; + console.log("Removing process from taskMap"); + taskMap.delete(taskRequest.taskId); + } + }; + + child.on("error", (err) => { + console.error("Transcoding failed:"); + console.error(err); + notified = true; + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "done", + result: false, + error: err.message, + }); + console.log("Orchestrator notified"); + + console.log("Removing process from taskMap"); + taskMap.delete(taskRequest.taskId); + }); + + child.on("close", completionHandler); + child.on("exit", completionHandler); + + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "inprogress", + }); }); socket.on("worker.task.kill", (data) => { - let taskEntry = taskMap.get(data.taskId); - if (taskEntry) { - console.log(`Killing child processes for task ${data.taskId}`); - taskEntry.transcodeProcess.kill(); - if (taskEntry.eaeProcess != null) { - taskEntry.eaeProcess.kill(); - } - console.log("Removing process from taskMap"); - taskMap.delete(data.taskId); - } + let taskEntry = taskMap.get(data.taskId); + if (taskEntry) { + console.log(`Killing child processes for task ${data.taskId}`); + taskEntry.transcodeProcess.kill(); + if (taskEntry.eaeProcess != null) { + taskEntry.eaeProcess.kill(); + } + console.log("Removing process from taskMap"); + taskMap.delete(data.taskId); + } }); socket.on("disconnect", () => { - console.log("Worker disconnected"); + console.log("Worker disconnected"); }); ON_DEATH((signal, err) => { - console.log("ON_DEATH signal detected"); - console.error(err); - process.exit(signal); + console.log("ON_DEATH signal detected"); + console.error(err); + process.exit(signal); }); diff --git a/worker/docker-mod/root/etc/cont-init.d/92-install-dependencies b/worker/docker-mod/root/etc/cont-init.d/92-install-dependencies index ac0ea50..43d539f 100644 --- a/worker/docker-mod/root/etc/cont-init.d/92-install-dependencies +++ b/worker/docker-mod/root/etc/cont-init.d/92-install-dependencies @@ -13,5 +13,11 @@ else curl -L https://raw.githubusercontent.com/tj/n/master/bin/n -o n echo "**** install nodejs ****" bash n lts + if [ -n "$EXTRA_APT_INSTALLS" ]; then + echo "**** EXTRA_APT_INSTALLS is set, installing '${EXTRA_APT_INSTALLS}' ... ****" + apt-get install -y $EXTRA_APT_INSTALLS + else + echo "**** EXTRA_APT_INSTALLS is not set, skipping... ****" + fi fi diff --git a/worker/extended-image/Dockerfile-development b/worker/extended-image/Dockerfile-development index a78e3db..0fa883b 100644 --- a/worker/extended-image/Dockerfile-development +++ b/worker/extended-image/Dockerfile-development @@ -1,5 +1,7 @@ FROM linuxserver/plex:latest +ARG EXTRA_APT_INSTALLS="" + LABEL maintainer="pabloromeo" COPY /docker-mod/root/etc/ /etc/ @@ -11,6 +13,8 @@ RUN apt-get update && \ dos2unix /app/start.sh && \ apt-get remove -y dos2unix +ENV EXTRA_APT_INSTALLS $EXTRA_APT_INSTALLS + RUN bash /etc/cont-init.d/92-install-dependencies && \ bash /etc/cont-init.d/93-npm-install && \ rm /etc/cont-init.d/92-install-dependencies && \