From af23dd3f5305cf5bd538335add278987e33a401f Mon Sep 17 00:00:00 2001 From: Pablo Romeo Date: Thu, 6 Jul 2023 02:07:25 -0300 Subject: [PATCH] Credits Detection bugfix and armhf deprecation (#233) * Credits Detection Fix * LinuxServer deprecated armhf --- .env-template | 3 + .github/workflows/main.yml | 50 +- .gitignore | 5 +- docker-compose.yaml | 91 ++ orchestrator/metrics.js | 136 ++- orchestrator/orchestrator.js | 1110 +++++++++--------- orchestrator/server.js | 33 +- pms/app/jobPoster.js | 69 +- pms/app/transcoder.js | 221 ++-- pms/extended-image/Dockerfile-development | 25 + worker/app/worker.js | 357 +++--- worker/extended-image/Dockerfile | 6 +- worker/extended-image/Dockerfile-development | 27 + 13 files changed, 1204 insertions(+), 929 deletions(-) create mode 100644 .env-template create mode 100644 docker-compose.yaml create mode 100644 pms/extended-image/Dockerfile-development create mode 100644 worker/extended-image/Dockerfile-development diff --git a/.env-template b/.env-template new file mode 100644 index 0000000..f500741 --- /dev/null +++ b/.env-template @@ -0,0 +1,3 @@ +TZ="America/Argentina/Buenos_Aires" +PUID=1000 +PGID=1000 \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5e919d4..3d94df2 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,23 +6,23 @@ name: Docker Builds on: workflow_dispatch: schedule: - - cron: '0 1 * * *' + - cron: "0 1 * * *" push: branches: - - 'master' - - 'dev' - - 'experimental' + - "master" + - "dev" + - "experimental" tags: - - 'v*.*.*' + - "v*.*.*" pull_request: branches: - - 'master' - - 'dev' + - "master" + - "dev" -permissions: +permissions: contents: read packages: write - + # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: build-pms: @@ -31,7 +31,7 @@ jobs: # Get the repositery's code - name: Checkout uses: actions/checkout@v3 - + # https://github.com/docker/setup-qemu-action - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -40,8 +40,8 @@ jobs: id: buildx uses: docker/setup-buildx-action@v2 -# - name: Available platforms -# run: echo ${{ steps.buildx.outputs.platforms }} + # - name: Available platforms + # run: echo ${{ steps.buildx.outputs.platforms }} - name: Login to GHCR if: github.event_name != 'pull_request' @@ -78,7 +78,7 @@ jobs: with: context: ./pms file: ./pms/extended-image/Dockerfile - platforms: linux/amd64,linux/arm64,linux/arm/v7 + platforms: linux/amd64,linux/arm64 push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta_pms.outputs.tags }} labels: ${{ steps.meta_pms.outputs.labels }} @@ -91,14 +91,14 @@ jobs: repository: pabloromeo/clusterplex_pms readme-filepath: ./README.md short-description: "PMS image for ClusterPlex" - + build-worker: runs-on: ubuntu-latest steps: # Get the repositery's code - name: Checkout uses: actions/checkout@v3 - + # https://github.com/docker/setup-qemu-action - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -107,8 +107,8 @@ jobs: id: buildx uses: docker/setup-buildx-action@v2 -# - name: Available platforms -# run: echo ${{ steps.buildx.outputs.platforms }} + # - name: Available platforms + # run: echo ${{ steps.buildx.outputs.platforms }} - name: Login to GHCR if: github.event_name != 'pull_request' @@ -124,8 +124,7 @@ jobs: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - - - name: Docker meta for Worker + - name: Docker meta for Worker id: meta_worker uses: docker/metadata-action@v4 with: @@ -147,7 +146,7 @@ jobs: with: context: ./worker file: ./worker/extended-image/Dockerfile - platforms: linux/amd64,linux/arm64,linux/arm/v7 + platforms: linux/amd64,linux/arm64 push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta_worker.outputs.tags }} labels: ${{ steps.meta_worker.outputs.labels }} @@ -167,7 +166,7 @@ jobs: # Get the repositery's code - name: Checkout uses: actions/checkout@v3 - + # https://github.com/docker/setup-qemu-action - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -176,8 +175,8 @@ jobs: id: buildx uses: docker/setup-buildx-action@v2 -# - name: Available platforms -# run: echo ${{ steps.buildx.outputs.platforms }} + # - name: Available platforms + # run: echo ${{ steps.buildx.outputs.platforms }} - name: Login to GHCR if: github.event_name != 'pull_request' @@ -193,8 +192,7 @@ jobs: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - - - name: Docker meta for Orchestrator + - name: Docker meta for Orchestrator id: meta_orchestrator uses: docker/metadata-action@v4 with: @@ -215,7 +213,7 @@ jobs: uses: docker/build-push-action@v4 with: context: ./orchestrator - platforms: linux/amd64,linux/arm64,linux/arm/v7 + platforms: linux/amd64,linux/arm64 push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta_orchestrator.outputs.tags }} labels: ${{ steps.meta_orchestrator.outputs.labels }} diff --git a/.gitignore b/.gitignore index 3e99214..0979573 100644 --- a/.gitignore +++ b/.gitignore @@ -60,7 +60,8 @@ typings/ # next.js build output .next -docker-compose.yml - # macOS **/.DS_STORE + +# for local testing +sample-content/ diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..01af9aa --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,91 @@ +version: '3.8' + +services: + plex: + container_name: plex + build: + context: ./pms + dockerfile: ./extended-image/Dockerfile-development + environment: + VERSION: docker + PUID: ${PUID} + PGID: ${PGID} + TZ: ${TZ} + ORCHESTRATOR_URL: http://plex-orchestrator:3500 + PMS_SERVICE: plex # This service. If you disable Local Relay then you must use PMS_IP instead + PMS_PORT: "32400" + TRANSCODE_OPERATING_MODE: both #(local|remote|both) + TRANSCODER_VERBOSE: "1" # 1=verbose, 0=silent + LOCAL_RELAY_ENABLED: "1" + LOCAL_RELAY_PORT: "32499" + healthcheck: + test: curl -fsS http://localhost:32400/identity > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 30s + volumes: + - plex-config:/config + - transcode-volume:/transcode + - ./sample-content/tv:/data/tv + - ./sample-content/movies:/data/movies + ports: + - 32499:32499 # LOCAL_RELAY_PORT + - 32400:32400 + - 3005:3005 + - 8324:8324 + - 1900:1900/udp + - 32410:32410/udp + - 32412:32412/udp + - 32413:32413/udp + - 32414:32414/udp + + plex-orchestrator: + container_name: plex-orchestrator + build: ./orchestrator + healthcheck: + test: curl -fsS http://localhost:3500/health > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 30s + environment: + TZ: ${TZ} + LISTENING_PORT: 3500 + WORKER_SELECTION_STRATEGY: "LOAD_RANK" # RR | LOAD_CPU | LOAD_TASKS | LOAD_RANK (default) + volumes: + - /etc/localtime:/etc/localtime:ro + ports: + - 3500:3500 + + plex-worker: + build: + context: ./worker + dockerfile: ./extended-image/Dockerfile-development + deploy: + mode: replicated + replicas: 1 + environment: + VERSION: docker + PUID: 1000 + PGID: 1000 + TZ: ${TZ} + LISTENING_PORT: 3501 # used by the healthcheck + STAT_CPU_INTERVAL: 2000 # interval for reporting worker load metrics + ORCHESTRATOR_URL: http://plex-orchestrator:3500 + EAE_SUPPORT: "1" + healthcheck: + test: curl -fsS http://localhost:3501/health > /dev/null || exit 1 + interval: 15s + timeout: 15s + retries: 5 + start_period: 240s + volumes: + - codecs:/codecs + - ./sample-content/tv:/data/tv + - ./sample-content/movies:/data/movies + - transcode-volume:/transcode +volumes: + plex-config: + transcode-volume: + codecs: diff --git a/orchestrator/metrics.js b/orchestrator/metrics.js index 54b467a..de525b9 100644 --- a/orchestrator/metrics.js +++ b/orchestrator/metrics.js @@ -1,84 +1,106 @@ -const client = require('prom-client'); +const client = require("prom-client"); //client.collectDefaultMetrics() const counterJobsPosted = new client.Counter({ - name: 'jobs_posted', - help: 'Jobs Posted' - }) + name: "jobs_posted", + help: "Jobs Posted", +}); const counterJobsCompleted = new client.Counter({ - name: 'jobs_completed', - help: 'Jobs Completed' -}) + name: "jobs_completed", + help: "Jobs Completed", +}); const counterJobsSucceeded = new client.Counter({ - name: 'jobs_succeeded', - help: 'Jobs Succeeded' -}) + name: "jobs_succeeded", + help: "Jobs Succeeded", +}); const counterJobsFailed = new client.Counter({ - name: 'jobs_failed', - help: 'Jobs Failed' -}) + name: "jobs_failed", + help: "Jobs Failed", +}); const counterJobsKilled = new client.Counter({ - name: 'jobs_killed', - help: 'Jobs Killed' -}) + name: "jobs_killed", + help: "Jobs Killed", +}); const gaugeJobPosters = new client.Gauge({ - name: 'job_posters_active', - help: 'Active Job Posters' -}) + name: "job_posters_active", + help: "Active Job Posters", +}); const gaugeWorkers = new client.Gauge({ - name: 'workers_active', - help: 'Active Workers' -}) + name: "workers_active", + help: "Active Workers", +}); const workerLoadCPUStats = new client.Gauge({ - name : 'worker_load_cpu', - help : 'Worker Load - CPU usage', - labelNames: ['worker_name'], -}) + name: "worker_load_cpu", + help: "Worker Load - CPU usage", + labelNames: ["worker_name"], +}); const workerLoadTasksStats = new client.Gauge({ - name : 'worker_load_tasks', - help : 'Worker Load - Tasks Count', - labelNames: ['worker_name'], -}) + name: "worker_load_tasks", + help: "Worker Load - Tasks Count", + labelNames: ["worker_name"], +}); const workerLoadOpsStats = new client.Gauge({ - name : 'worker_load_ops', - help : 'Worker Load - CPU ops', - labelNames: ['worker_name'], -}) + name: "worker_load_ops", + help: "Worker Load - CPU ops", + labelNames: ["worker_name"], +}); const workerLoadRankStats = new client.Gauge({ - name : 'worker_load_rank', - help : 'Worker Load - Rank', - labelNames: ['worker_name'], -}) + name: "worker_load_rank", + help: "Worker Load - Rank", + labelNames: ["worker_name"], +}); module.exports = { - jobPosted : () => { counterJobsPosted.inc(1) }, - jobCompleted : () => { counterJobsCompleted.inc(1) }, - jobSucceeded : () => { counterJobsSucceeded.inc(1) }, - jobFailed : () => { counterJobsFailed.inc(1) }, - jobKilled : () => { counterJobsKilled.inc(1) }, + jobPosted: () => { + counterJobsPosted.inc(1); + }, + jobCompleted: () => { + counterJobsCompleted.inc(1); + }, + jobSucceeded: () => { + counterJobsSucceeded.inc(1); + }, + jobFailed: () => { + counterJobsFailed.inc(1); + }, + jobKilled: () => { + counterJobsKilled.inc(1); + }, - setActiveWorkers : (amount) => { gaugeWorkers.set(amount) }, - setActiveJobPosters : (amount) => { gaugeJobPosters.set(amount) }, - setWorkerLoadCPU : (workerName, value) => { workerLoadCPUStats.labels(workerName).set(value) }, - setWorkerLoadTasks : (workerName, value) => { workerLoadTasksStats.labels(workerName).set(value) }, - setWorkerLoadOps : (workerName, value) => { workerLoadOpsStats.labels(workerName).set(value) }, - setWorkerLoadRank : (workerName, value) => { workerLoadRankStats.labels(workerName).set(value) }, - - injectMetricsRoute : (app) => { - app.get('/metrics', async (req, res) => { - res.set('Content-Type', client.register.contentType); - res.end(await client.register.metrics()); - }) - } -} + setActiveWorkers: (amount) => { + gaugeWorkers.set(amount); + }, + setActiveJobPosters: (amount) => { + gaugeJobPosters.set(amount); + }, + setWorkerLoadCPU: (workerName, value) => { + workerLoadCPUStats.labels(workerName).set(value); + }, + setWorkerLoadTasks: (workerName, value) => { + workerLoadTasksStats.labels(workerName).set(value); + }, + setWorkerLoadOps: (workerName, value) => { + workerLoadOpsStats.labels(workerName).set(value); + }, + setWorkerLoadRank: (workerName, value) => { + workerLoadRankStats.labels(workerName).set(value); + }, + + injectMetricsRoute: (app) => { + app.get("/metrics", async (req, res) => { + res.set("Content-Type", client.register.contentType); + res.end(await client.register.metrics()); + }); + }, +}; diff --git a/orchestrator/orchestrator.js b/orchestrator/orchestrator.js index 8eec64e..1799db4 100644 --- a/orchestrator/orchestrator.js +++ b/orchestrator/orchestrator.js @@ -1,574 +1,622 @@ -const { v4: uuid } = require('uuid'); -const STREAM_SPLITTING = process.env.STREAM_SPLITTING || 'OFF' +const { v4: uuid } = require("uuid"); +const STREAM_SPLITTING = process.env.STREAM_SPLITTING || "OFF"; // selection strategies: // RR : Round-Robin // LOAD_CPU: Lowest CPU load // LOAD_TASKS: Lowest task count // LOAD_RANK: OPS * (1 - CPU load) -const WORKER_SELECTION_STRATEGY_RR = 'RR' -const WORKER_SELECTION_STRATEGY_LOAD_CPU = 'LOAD_CPU' -const WORKER_SELECTION_STRATEGY_LOAD_TASKS = 'LOAD_TASKS' -const WORKER_SELECTION_STRATEGY_LOAD_RANK = 'LOAD_RANK' -const WORKER_SELECTION_STRATEGY = process.env.WORKER_SELECTION_STRATEGY || WORKER_SELECTION_STRATEGY_LOAD_RANK +const WORKER_SELECTION_STRATEGY_RR = "RR"; +const WORKER_SELECTION_STRATEGY_LOAD_CPU = "LOAD_CPU"; +const WORKER_SELECTION_STRATEGY_LOAD_TASKS = "LOAD_TASKS"; +const WORKER_SELECTION_STRATEGY_LOAD_RANK = "LOAD_RANK"; +const WORKER_SELECTION_STRATEGY = + process.env.WORKER_SELECTION_STRATEGY || WORKER_SELECTION_STRATEGY_LOAD_RANK; -const metrics = require('./metrics') +const metrics = require("./metrics"); class Worker { - constructor(id, socketId, host) { - this.id = id - this.socketId = socketId - this.host = host - this.name = `${this.id}|${this.host}` - this.stats = { cpu: 0, tasks: 0, ops: 0, rank: 0 } - this.activeTaskCount = 0 - this.usageCounter = 0 - } - - onTaskAssigned() { - this.activeTaskCount++ - this.usageCounter++ - } - - onTaskUnassigned() { - this.activeTaskCount-- - } - - onRegister() { - } - - onUnregister() { - } - - updateStats(stats) { - this.stats = { cpu : parseFloat(stats.cpu), tasks: parseInt(stats.tasks), ops: parseInt(stats.ops)} - this.stats.rank = parseInt(this.stats.ops * (1 - (this.stats.cpu / 100.0))) - metrics.setWorkerLoadCPU(this.host, this.stats.cpu) - metrics.setWorkerLoadTasks(this.host, this.stats.tasks) - metrics.setWorkerLoadOps(this.host, this.stats.ops) - metrics.setWorkerLoadRank(this.host, this.stats.rank) - } + constructor(id, socketId, host) { + this.id = id; + this.socketId = socketId; + this.host = host; + this.name = `${this.id}|${this.host}`; + this.stats = { cpu: 0, tasks: 0, ops: 0, rank: 0 }; + this.activeTaskCount = 0; + this.usageCounter = 0; + } + + onTaskAssigned() { + this.activeTaskCount++; + this.usageCounter++; + } + + onTaskUnassigned() { + this.activeTaskCount--; + } + + onRegister() {} + + onUnregister() {} + + updateStats(stats) { + this.stats = { + cpu: parseFloat(stats.cpu), + tasks: parseInt(stats.tasks), + ops: parseInt(stats.ops), + }; + this.stats.rank = parseInt(this.stats.ops * (1 - this.stats.cpu / 100.0)); + metrics.setWorkerLoadCPU(this.host, this.stats.cpu); + metrics.setWorkerLoadTasks(this.host, this.stats.tasks); + metrics.setWorkerLoadOps(this.host, this.stats.ops); + metrics.setWorkerLoadRank(this.host, this.stats.rank); + } } class WorkerSet { - constructor() { - this.workers = new Map() - this.workerSelectionStrategy = this.getWorkerSelectionStrategy() - } - - register(worker) { - console.log(`Registering worker ${worker.name}`) - this.workers.set(worker.id, worker) - worker.onRegister() - metrics.setActiveWorkers(this.size()) - } - - unregister(socketId) { - let worker = this.findBySocketId(socketId) - if (worker) { - console.log(`Unregistering worker ${worker.name}`) - worker.onUnregister() - this.workers.delete(worker.id) - } else { - console.error(`No worker found for socket ${socketId}`) - } - metrics.setActiveWorkers(this.size()) - } - - findById(id) { - return this.workers.get(id) - } - - findBySocketId(socketId) { - let workerArray = Array.from(this.workers.values()) - let worker = workerArray.filter(w => w.socketId === socketId) - if (worker.length > 0) - return worker[0] - return null - } - - getNextAvailableWorker() { - if (this.workers.size == 0) { - return null - } - return this.workerSelectionStrategy(); - } - - updateStats(socketId, stats) { - var worker = this.findBySocketId(socketId) - if (worker) { - worker.updateStats(stats) - } - else { - console.warn(`Received stats from unregistered worker on socket ${socketId}`) - } - } - - size() { - return this.workers.size - } - - getWorkerSelectionStrategy() { - console.debug(`Using Worker Selection Strategy: ${WORKER_SELECTION_STRATEGY}`) - - switch (WORKER_SELECTION_STRATEGY) { - case WORKER_SELECTION_STRATEGY_RR: - this.roundRobinIndex = -1 - return this.roundRobinSelector - case WORKER_SELECTION_STRATEGY_LOAD_CPU: - return this.loadBasedSelector - case WORKER_SELECTION_STRATEGY_LOAD_TASKS: - return this.taskLoadBasedSelector - case WORKER_SELECTION_STRATEGY_LOAD_RANK: - return this.rankedSelector - } - } - - roundRobinSelector() { - let currentWorkers = Array.from(this.workers.values()) - this.roundRobinIndex = (this.roundRobinIndex + 1) % currentWorkers.length - return currentWorkers[this.roundRobinIndex] - } - - loadBasedSelector() { - let currentWorkers = Array.from(this.workers.values()) - if (currentWorkers.length > 0) { - return currentWorkers.reduce((a,b) => a.stats.cpu < b.stats.cpu ? a : b) - } else { - return null - } - } - - taskLoadBasedSelector() { - let currentWorkers = Array.from(this.workers.values()) - if (currentWorkers.length > 0) { - return currentWorkers.reduce((a,b) => a.stats.tasks < b.stats.tasks ? a : b) - } else { - return null - } - } - - rankedSelector() { - let currentWorkers = Array.from(this.workers.values()) - if (currentWorkers.length > 0) { - return currentWorkers.reduce((a,b) => a.stats.rank > b.stats.rank ? a : b) - } else { - return null - } - } + constructor() { + this.workers = new Map(); + this.workerSelectionStrategy = this.getWorkerSelectionStrategy(); + } + + register(worker) { + console.log(`Registering worker ${worker.name}`); + this.workers.set(worker.id, worker); + worker.onRegister(); + metrics.setActiveWorkers(this.size()); + } + + unregister(socketId) { + let worker = this.findBySocketId(socketId); + if (worker) { + console.log(`Unregistering worker ${worker.name}`); + worker.onUnregister(); + this.workers.delete(worker.id); + } else { + console.error(`No worker found for socket ${socketId}`); + } + metrics.setActiveWorkers(this.size()); + } + + findById(id) { + return this.workers.get(id); + } + + findBySocketId(socketId) { + let workerArray = Array.from(this.workers.values()); + let worker = workerArray.filter((w) => w.socketId === socketId); + if (worker.length > 0) return worker[0]; + return null; + } + + getNextAvailableWorker() { + if (this.workers.size == 0) { + return null; + } + return this.workerSelectionStrategy(); + } + + updateStats(socketId, stats) { + var worker = this.findBySocketId(socketId); + if (worker) { + worker.updateStats(stats); + } else { + console.warn( + `Received stats from unregistered worker on socket ${socketId}` + ); + } + } + + size() { + return this.workers.size; + } + + getWorkerSelectionStrategy() { + console.debug( + `Using Worker Selection Strategy: ${WORKER_SELECTION_STRATEGY}` + ); + + switch (WORKER_SELECTION_STRATEGY) { + case WORKER_SELECTION_STRATEGY_RR: + this.roundRobinIndex = -1; + return this.roundRobinSelector; + case WORKER_SELECTION_STRATEGY_LOAD_CPU: + return this.loadBasedSelector; + case WORKER_SELECTION_STRATEGY_LOAD_TASKS: + return this.taskLoadBasedSelector; + case WORKER_SELECTION_STRATEGY_LOAD_RANK: + return this.rankedSelector; + } + } + + roundRobinSelector() { + let currentWorkers = Array.from(this.workers.values()); + this.roundRobinIndex = (this.roundRobinIndex + 1) % currentWorkers.length; + return currentWorkers[this.roundRobinIndex]; + } + + loadBasedSelector() { + let currentWorkers = Array.from(this.workers.values()); + if (currentWorkers.length > 0) { + return currentWorkers.reduce((a, b) => + a.stats.cpu < b.stats.cpu ? a : b + ); + } else { + return null; + } + } + + taskLoadBasedSelector() { + let currentWorkers = Array.from(this.workers.values()); + if (currentWorkers.length > 0) { + return currentWorkers.reduce((a, b) => + a.stats.tasks < b.stats.tasks ? a : b + ); + } else { + return null; + } + } + + rankedSelector() { + let currentWorkers = Array.from(this.workers.values()); + if (currentWorkers.length > 0) { + return currentWorkers.reduce((a, b) => + a.stats.rank > b.stats.rank ? a : b + ); + } else { + return null; + } + } } class JobPoster { - constructor(id, socketId, host) { - this.id = id - this.socketId = socketId - this.host = host - this.name = `${this.id}|${host}` - } + constructor(id, socketId, host) { + this.id = id; + this.socketId = socketId; + this.host = host; + this.name = `${this.id}|${host}`; + } } class JobPosterSet { - constructor() { - this.items = new Map() - } - - get(jobPosterId) { - return this.items.get(jobPosterId) - } - add(jobPoster) { - this.items.set(jobPoster.id, jobPoster) - metrics.setActiveJobPosters(this.items.size) - } - - remove(jobPoster) { - this.items.delete(jobPoster.id) - metrics.setActiveJobPosters(this.items.size) - } - - findBySocketId(id) { - for(const jobPoster of this.items.values()) { - if (jobPoster.socketId === id) - return jobPoster; - } - return null; - } + constructor() { + this.items = new Map(); + } + + get(jobPosterId) { + return this.items.get(jobPosterId); + } + add(jobPoster) { + this.items.set(jobPoster.id, jobPoster); + metrics.setActiveJobPosters(this.items.size); + } + + remove(jobPoster) { + this.items.delete(jobPoster.id); + metrics.setActiveJobPosters(this.items.size); + } + + findBySocketId(id) { + for (const jobPoster of this.items.values()) { + if (jobPoster.socketId === id) return jobPoster; + } + return null; + } } class Job { - constructor(jobPosterId, payload) { - this.id = uuid() - this.jobPosterId = jobPosterId - this.payload = payload - this.tasks = [] // when all tasks in the job are finished, the job is done - this.status = 'pending' - this.result = null - } - - addTask(type, payload) { - this.tasks.push(new Task(this, uuid(), type, payload)) - } - - notifyTaskUpdate(task) { - if (task.status === 'done') { - //on first failure, the job fails - if (!task.result) { - this.result = false - } - - if (this.tasks.filter(x => x.status !== 'done').length === 0) { - this.status = 'done' - // result is true if all tasks have a result of true - this.result = this.tasks.every(x => x.result) - } - } - } + constructor(jobPosterId, payload) { + this.id = uuid(); + this.jobPosterId = jobPosterId; + this.payload = payload; + this.tasks = []; // when all tasks in the job are finished, the job is done + this.status = "pending"; + this.result = null; + } + + addTask(type, payload) { + this.tasks.push(new Task(this, uuid(), type, payload)); + } + + notifyTaskUpdate(task) { + if (task.status === "done") { + //on first failure, the job fails + if (!task.result) { + this.result = false; + } + + if (this.tasks.filter((x) => x.status !== "done").length === 0) { + this.status = "done"; + // result is true if all tasks have a result of true + this.result = this.tasks.every((x) => x.result); + } + } + } } class Task { - // a portion of the job to complete - constructor(job, id, type, payload) { - this.job = job - this.id = id - this.type = type - this.payload = payload - this.status = 'pending' - this.workerId = null - this.result = null - this.error = null - } - - assignTo(worker) { - this.workerId = worker.id - this.status = 'assigned' - worker.onTaskAssigned(); - } - - unassignFrom(worker) { - worker.onTaskUnassigned(); - } - - update(status, result, error) { - this.status = status - this.result = result - this.error = error - this.job.notifyTaskUpdate(this) - } + // a portion of the job to complete + constructor(job, id, type, payload) { + this.job = job; + this.id = id; + this.type = type; + this.payload = payload; + this.status = "pending"; + this.workerId = null; + this.result = null; + this.error = null; + } + + assignTo(worker) { + this.workerId = worker.id; + this.status = "assigned"; + worker.onTaskAssigned(); + } + + unassignFrom(worker) { + worker.onTaskUnassigned(); + } + + update(status, result, error) { + this.status = status; + this.result = result; + this.error = error; + this.job.notifyTaskUpdate(this); + } } class TaskUpdate { - constructor(taskId, taskStatus, taskResult, taskError) { - this.taskId = taskId - this.taskStatus = taskStatus - this.taskResult = taskResult - this.taskError = taskError - } + constructor(taskId, taskStatus, taskResult, taskError) { + this.taskId = taskId; + this.taskStatus = taskStatus; + this.taskResult = taskResult; + this.taskError = taskError; + } } class WorkQueue { - constructor(onRunTask, onTaskComplete, onTaskKilled, onJobComplete, onJobKilled) { - this.tasks = new Map() - this.jobs = new Set() - this.onRunTask = onRunTask - this.onTaskComplete = onTaskComplete - this.onTaskKilled = onTaskKilled - this.onJobComplete = onJobComplete - this.onJobKilled = onJobKilled - - var self = this - setInterval(() => { self.step() }, 2000) - } - - enqueue(job) { - console.log(`Queueing job ${job.id}`) - this.jobs.add(job) - for (const task of job.tasks) { - console.log(`Queueing task ${task.id}`) - this.tasks.set(task.id, task) + constructor( + onRunTask, + onTaskComplete, + onTaskKilled, + onJobComplete, + onJobKilled + ) { + this.tasks = new Map(); + this.jobs = new Set(); + this.onRunTask = onRunTask; + this.onTaskComplete = onTaskComplete; + this.onTaskKilled = onTaskKilled; + this.onJobComplete = onJobComplete; + this.onJobKilled = onJobKilled; + + var self = this; + setInterval(() => { + self.step(); + }, 2000); + } + + enqueue(job) { + console.log(`Queueing job ${job.id}`); + this.jobs.add(job); + for (const task of job.tasks) { + console.log(`Queueing task ${task.id}`); + this.tasks.set(task.id, task); + } + metrics.jobPosted(); + } + + update(taskUpdate) { + console.log( + `Received update for task ${taskUpdate.taskId}, status: ${taskUpdate.taskStatus}` + ); + + let task = this.tasks.get(taskUpdate.taskId); + + if (!task) { + console.log(`Discarding task update for ${taskUpdate.taskId}`); + return; + } + + task.update( + taskUpdate.taskStatus, + taskUpdate.taskResult, + taskUpdate.taskError + ); + if (task.status === "done") { + this.onTaskComplete(task); + this.tasks.delete(task.id); + console.log(`Task ${task.id} complete`); + } + if (task.job.status === "done") { + if (task.job.result) { + metrics.jobSucceeded(); + } else { + metrics.jobFailed(); + } + metrics.jobCompleted(); + + this.onJobComplete(task.job); + this.jobs.delete(task.job); + console.log(`Job ${task.job.id} complete`); + } + } + + kill(jobSelector) { + let filteredJobs = Array.from(this.jobs).filter(jobSelector); + for (const job of filteredJobs) { + console.log(`Killing job ${job.id}`); + for (const task of job.tasks) { + if (task.status !== "done") { + task.status = "killed"; + this.onTaskKilled(task); } - metrics.jobPosted() - } - - update(taskUpdate) { - console.log(`Received update for task ${taskUpdate.taskId}, status: ${taskUpdate.taskStatus}`) - - let task = this.tasks.get(taskUpdate.taskId) - - if (!task) { - console.log(`Discarding task update for ${taskUpdate.taskId}`) - return - } - - task.update(taskUpdate.taskStatus, taskUpdate.taskResult, taskUpdate.taskError) - if (task.status === 'done') { - this.onTaskComplete(task) - this.tasks.delete(task.id) - console.log(`Task ${task.id} complete`) - } - if (task.job.status === 'done') { - if (task.job.result) { - metrics.jobSucceeded() - } else { - metrics.jobFailed() - } - metrics.jobCompleted() - - this.onJobComplete(task.job) - this.jobs.delete(task.job) - console.log(`Job ${task.job.id} complete`) - } - } - - kill(jobSelector) { - let filteredJobs = Array.from(this.jobs).filter(jobSelector) - for (const job of filteredJobs) { - console.log(`Killing job ${job.id}`) - for (const task of job.tasks) { - if (task.status !== 'done') { - task.status = 'killed' - this.onTaskKilled(task) - } - this.tasks.delete(task.id) - } - job.status = 'killed' - this.jobs.delete(job) - this.onJobKilled(job) - metrics.jobKilled() - metrics.jobCompleted() - } - } - - killWorkerTasks(workerId) { - console.debug(`Killing pending tasks for worker: ${workerId}`) - for (const task of Array.from(this.tasks.values()).filter(x => x.workerId === workerId)) { - this.update(new TaskUpdate(task.id, 'done', false, 'Worker disconnected')) - } - } - - step() { - for (const task of this.tasks.values()) { - //cleanup - if (task.status === 'done' || task.status === 'killed') - this.tasks.delete(task) - else if (task.status === 'pending') { - console.log(`Running task ${task.id}`) - this.onRunTask(task) - } - } - } + this.tasks.delete(task.id); + } + job.status = "killed"; + this.jobs.delete(job); + this.onJobKilled(job); + metrics.jobKilled(); + metrics.jobCompleted(); + } + } + + killWorkerTasks(workerId) { + console.debug(`Killing pending tasks for worker: ${workerId}`); + for (const task of Array.from(this.tasks.values()).filter( + (x) => x.workerId === workerId + )) { + this.update( + new TaskUpdate(task.id, "done", false, "Worker disconnected") + ); + } + } + + step() { + for (const task of this.tasks.values()) { + //cleanup + if (task.status === "done" || task.status === "killed") + this.tasks.delete(task); + else if (task.status === "pending") { + console.log(`Running task ${task.id}`); + this.onRunTask(task); + } + } + } } module.exports.injectMetricsRoute = (app) => { - return metrics.injectMetricsRoute(app) -} + return metrics.injectMetricsRoute(app); +}; module.exports.init = (server) => { - console.log('Initializing orchestrator') - - const io = require('socket.io')(server, { - serveClient: false - }); - - let workers = new WorkerSet() - let jobPosters = new JobPosterSet() - let jobs = new Set() - let workQueue = new WorkQueue(runTask, taskComplete, taskKilled, jobComplete, jobKilled) - let disconnectionHandlers = new Map() - let taskBuilder = getTaskBuilderStrategy(STREAM_SPLITTING); - - function runTask(task) { - let worker = workers.getNextAvailableWorker() - if (worker) { - console.log(`Forwarding work request to ${worker.name}`) - task.assignTo(worker) - io.sockets.sockets.get(worker.socketId).emit('worker.task.request', { - taskId: task.id, - payload: task.payload - }) - } else { - console.log('No worker available at the moment') - workQueue.update(new TaskUpdate(task.id, 'done', false, 'No worker available')) - } - } - - function taskComplete(task) { - console.log(`Task ${task.id} complete, result: ${task.result}`) - let worker = workers.findById(task.workerId) - if (worker) { - task.unassignFrom(worker) + console.log("Initializing orchestrator"); + + const io = require("socket.io")(server, { + serveClient: false, + }); + + let workers = new WorkerSet(); + let jobPosters = new JobPosterSet(); + let jobs = new Set(); + let workQueue = new WorkQueue( + runTask, + taskComplete, + taskKilled, + jobComplete, + jobKilled + ); + let disconnectionHandlers = new Map(); + let taskBuilder = getTaskBuilderStrategy(STREAM_SPLITTING); + + function runTask(task) { + let worker = workers.getNextAvailableWorker(); + if (worker) { + console.log(`Forwarding work request to ${worker.name}`); + task.assignTo(worker); + io.sockets.sockets.get(worker.socketId).emit("worker.task.request", { + taskId: task.id, + payload: task.payload, + }); + } else { + console.log("No worker available at the moment"); + workQueue.update( + new TaskUpdate(task.id, "done", false, "No worker available") + ); + } + } + + function taskComplete(task) { + console.log(`Task ${task.id} complete, result: ${task.result}`); + let worker = workers.findById(task.workerId); + if (worker) { + task.unassignFrom(worker); + } + } + + function taskKilled(task) { + if (task.status !== "done") { + let worker = workers.findById(task.workerId); + if (worker) { + task.unassignFrom(worker); + let workerSocket = io.sockets.sockets.get(worker.socketId); + if (workerSocket !== undefined && workerSocket.connected) { + workerSocket.emit("worker.task.kill", { taskId: task.id }); + console.log(`Telling worker ${worker.name} to kill task ${task.id}`); } - } - - function taskKilled(task) { - if (task.status !== 'done') { - let worker = workers.findById(task.workerId) - if (worker) { - task.unassignFrom(worker) - let workerSocket = io.sockets.sockets.get(worker.socketId) - if (workerSocket !== undefined && workerSocket.connected) { - workerSocket.emit('worker.task.kill', { taskId: task.id }) - console.log(`Telling worker ${worker.name} to kill task ${task.id}`) - } - } - } - } - - function jobComplete(job) { - console.log(`Job ${job.id} complete, tasks: ${job.tasks.length}, result: ${job.result}`) - let jobPoster = jobPosters.get(job.jobPosterId) - let posterSocket = io.sockets.sockets.get(jobPoster.socketId) - if (posterSocket !== undefined && posterSocket.connected) { - posterSocket.emit('jobposter.job.response', { result : job.result }) - console.log('JobPoster notified') - } - - console.log(`Removing job ${job.id}`) - jobs.delete(job) - } - - function jobKilled(job) { - jobs.delete(job) - console.log(`Job ${job.id} killed`) - } - - function workerDisconnectionHandler(socket) { - console.log(`Unregistering worker at socket ${socket.id}`) - let worker = workers.findBySocketId(socket.id) - workQueue.killWorkerTasks(worker.id) - workers.unregister(socket.id) - } - - function jobPosterDisconnectionHandler(socket) { - let jobPoster = jobPosters.findBySocketId(socket.id) - if (jobPoster) { - console.log(`Removing job-poster ${jobPoster.name} from pool`) - jobPosters.remove(jobPoster) - - workQueue.kill(v => { return v.jobPosterId === jobPoster.id }) - } - } + } + } + } + + function jobComplete(job) { + console.log( + `Job ${job.id} complete, tasks: ${job.tasks.length}, result: ${job.result}` + ); + let jobPoster = jobPosters.get(job.jobPosterId); + let posterSocket = io.sockets.sockets.get(jobPoster.socketId); + if (posterSocket !== undefined && posterSocket.connected) { + posterSocket.emit("jobposter.job.response", { result: job.result }); + console.log("JobPoster notified"); + } + + console.log(`Removing job ${job.id}`); + jobs.delete(job); + } + + function jobKilled(job) { + jobs.delete(job); + console.log(`Job ${job.id} killed`); + } + + function workerDisconnectionHandler(socket) { + console.log(`Unregistering worker at socket ${socket.id}`); + let worker = workers.findBySocketId(socket.id); + workQueue.killWorkerTasks(worker.id); + workers.unregister(socket.id); + } + + function jobPosterDisconnectionHandler(socket) { + let jobPoster = jobPosters.findBySocketId(socket.id); + if (jobPoster) { + console.log(`Removing job-poster ${jobPoster.name} from pool`); + jobPosters.remove(jobPoster); + + workQueue.kill((v) => { + return v.jobPosterId === jobPoster.id; + }); + } + } + + function getTaskBuilderStrategy(streamSplittingSetting) { + if (streamSplittingSetting === "ON") { + console.log("Stream-Splitting: ENABLED"); + return multiWorkerTaskBuilder; + } else { + console.log("Stream-Splitting: DISABLED"); + return singleWorkerTaskBuilder; + } + } + + function singleWorkerTaskBuilder(job) { + console.log("Creating single task for the job"); + let request = job.payload; + job.addTask(request.type, request.payload); + } + + function multiWorkerTaskBuilder(job) { + console.log("Creating multiple tasks for the job"); + let request = job.payload; + let args = request.payload.args; + + console.log(`All Args => ${args}`); + + let segmentTime = parseInt(getArgsValueOf(args, "-segment_time")); + let ss = parseInt(getArgsValueOf(args, "-ss")); + let minSegDuration = parseInt(getArgsValueOf(args, "-min_seg_duration")); + let skipToSegment = parseInt(getArgsValueOf(args, "-skip_to_segment")); + let segmentStartNumber = parseInt( + getArgsValueOf(args, "-segment_start_number") + ); + + console.log( + `Args => segment_time: ${segmentTime}, ss: ${ss}, min_seg_duration: ${minSegDuration}, skip_to_segment: ${skipToSegment}, segment_start_number: ${segmentStartNumber}` + ); + + job.addTask(request.type, request.payload); + + // //if no ss then we only generate one streaming task (at least for now) + // if (!ss) { + // job.addTask(request.type, request.payload) + // return + // } + + // let segmentDuration = parseInt(minSegDuration / 1000000) + + // const SEGMENTS_PER_NODE = 5 + // const totalWorkers = 2 // workers.size() + // for (let i = 0; i < totalWorkers; i++) { + // console.log(`Multi-part segment ${i + 1}`) + // let newPayload = JSON.parse(JSON.stringify(request.payload)) + // let newSs = ss + segmentDuration * SEGMENTS_PER_NODE * i + // setArgsValueOf(newPayload.args, '-ss', newSs) + // setArgsValueOf(newPayload.args, '-skip_to_segment', skipToSegment + SEGMENTS_PER_NODE * i) + + // //remove start_at_zero argument + // //newPayload.args.splice(newPayload.args.indexOf('-start_at_zero'), 1) + + // //let iIndex = newPayload.args.indexOf('-i') + // //newPayload.args.splice(iIndex, 0, '-t') + // //newPayload.args.splice(iIndex + 1, 0, segmentDuration * SEGMENTS_PER_NODE) + // job.addTask(request.type, newPayload) + // console.log(`Args => ${newPayload.args}`) + // } + } + + function getArgsValueOf(arr, key) { + let i = arr.indexOf(key); + if (i >= 0) { + return arr[i + 1]; + } + } + + function setArgsValueOf(arr, key, newValue) { + let i = arr.indexOf(key); + if (i >= 0) { + arr[i + 1] = newValue; + } + } + + console.log("Setting up websockets"); + + io.on("connection", (socket) => { + console.log(`Client connected: ${socket.id}`); + + socket.on("worker.stats", (stats) => { + workers.updateStats(socket.id, stats); + }); - function getTaskBuilderStrategy(streamSplittingSetting) { - if (streamSplittingSetting === 'ON') { - console.log('Stream-Splitting: ENABLED') - return multiWorkerTaskBuilder - } - else { - console.log('Stream-Splitting: DISABLED') - return singleWorkerTaskBuilder - } - } + socket.on("worker.announce", (data) => { + const worker = new Worker(data.workerId, socket.id, data.host); + workers.register(worker); + disconnectionHandlers.set(socket.id, workerDisconnectionHandler); + console.log(`Registered new worker: ${worker.name}`); + }); - function singleWorkerTaskBuilder(job) { - console.log('Creating single task for the job') - let request = job.payload - job.addTask(request.type, request.payload) - } + socket.on("jobposter.announce", (data) => { + const jobposter = new JobPoster(data.jobPosterId, socket.id, data.host); + jobPosters.add(jobposter); + disconnectionHandlers.set(socket.id, jobPosterDisconnectionHandler); + console.log(`Registered new job poster: ${jobposter.name}`); + socket.emit("jobposter.produce"); // tell jobPoster he can send work + }); - function multiWorkerTaskBuilder(job) { - console.log('Creating multiple tasks for the job') - let request = job.payload - let args = request.payload.args - - console.log(`All Args => ${args}`) - - let segmentTime = parseInt(getArgsValueOf(args, '-segment_time')) - let ss = parseInt(getArgsValueOf(args, '-ss')) - let minSegDuration = parseInt(getArgsValueOf(args, '-min_seg_duration')) - let skipToSegment = parseInt(getArgsValueOf(args, '-skip_to_segment')) - let segmentStartNumber = parseInt(getArgsValueOf(args, '-segment_start_number')) - - console.log(`Args => segment_time: ${segmentTime}, ss: ${ss}, min_seg_duration: ${minSegDuration}, skip_to_segment: ${skipToSegment}, segment_start_number: ${segmentStartNumber}`) - - job.addTask(request.type, request.payload) - - // //if no ss then we only generate one streaming task (at least for now) - // if (!ss) { - // job.addTask(request.type, request.payload) - // return - // } - - // let segmentDuration = parseInt(minSegDuration / 1000000) - - // const SEGMENTS_PER_NODE = 5 - // const totalWorkers = 2 // workers.size() - // for (let i = 0; i < totalWorkers; i++) { - // console.log(`Multi-part segment ${i + 1}`) - // let newPayload = JSON.parse(JSON.stringify(request.payload)) - // let newSs = ss + segmentDuration * SEGMENTS_PER_NODE * i - // setArgsValueOf(newPayload.args, '-ss', newSs) - // setArgsValueOf(newPayload.args, '-skip_to_segment', skipToSegment + SEGMENTS_PER_NODE * i) - - // //remove start_at_zero argument - // //newPayload.args.splice(newPayload.args.indexOf('-start_at_zero'), 1) - - // //let iIndex = newPayload.args.indexOf('-i') - // //newPayload.args.splice(iIndex, 0, '-t') - // //newPayload.args.splice(iIndex + 1, 0, segmentDuration * SEGMENTS_PER_NODE) - // job.addTask(request.type, newPayload) - // console.log(`Args => ${newPayload.args}`) - // } - } + socket.on("jobposter.job.request", (request) => { + let jobPoster = jobPosters.findBySocketId(socket.id); - function getArgsValueOf(arr, key) { - let i = arr.indexOf(key) - if (i >= 0) { - return arr[i+1] - } - } + let job = new Job(jobPoster.id, request); + taskBuilder(job); // creates N tasks for the job request + jobs.add(job); + workQueue.enqueue(job); + }); - function setArgsValueOf(arr, key, newValue) { - let i = arr.indexOf(key) - if (i >= 0) { - arr[i+1] = newValue - } - } + socket.on("worker.task.update", (data) => { + let taskUpdate = new TaskUpdate( + data.taskId, + data.status, + data.result, + data.error + ); + workQueue.update(taskUpdate); + }); - console.log('Setting up websockets') - - io.on('connection', socket => { - console.log(`Client connected: ${socket.id}`) - - socket.on('worker.stats', stats => { - workers.updateStats(socket.id, stats) - }) - - socket.on('worker.announce', data => { - const worker = new Worker(data.workerId, socket.id, data.host); - workers.register(worker) - disconnectionHandlers.set(socket.id, workerDisconnectionHandler) - console.log(`Registered new worker: ${worker.name}`) - }) - - socket.on('jobposter.announce', data => { - const jobposter = new JobPoster(data.jobPosterId, socket.id, data.host); - jobPosters.add(jobposter) - disconnectionHandlers.set(socket.id, jobPosterDisconnectionHandler) - console.log(`Registered new job poster: ${jobposter.name}`) - socket.emit('jobposter.produce') // tell jobPoster he can send work - }) - - socket.on('jobposter.job.request', request => { - let jobPoster = jobPosters.findBySocketId(socket.id) - - let job = new Job(jobPoster.id, request) - taskBuilder(job) // creates N tasks for the job request - jobs.add(job) - workQueue.enqueue(job) - }) - - socket.on('worker.task.update', data => { - let taskUpdate = new TaskUpdate(data.taskId, data.status, data.result, data.error) - workQueue.update(taskUpdate) - }) - - socket.on('disconnect', () => { - console.log(`Client disconnected: ${socket.id}`) - let handler = disconnectionHandlers.get(socket.id) - if (handler) { - handler(socket) - disconnectionHandlers.delete(socket.id) - } - }) - }) - console.log('Ready') -} + socket.on("disconnect", () => { + console.log(`Client disconnected: ${socket.id}`); + let handler = disconnectionHandlers.get(socket.id); + if (handler) { + handler(socket); + disconnectionHandlers.delete(socket.id); + } + }); + }); + console.log("Ready"); +}; diff --git a/orchestrator/server.js b/orchestrator/server.js index 2b4c7f2..8ef4789 100644 --- a/orchestrator/server.js +++ b/orchestrator/server.js @@ -1,25 +1,24 @@ -const LISTENING_PORT = process.env.LISTENING_PORT || 3500 +const LISTENING_PORT = process.env.LISTENING_PORT || 3500; -var ON_DEATH = require('death')({debug: true}) -var app = require('express')(); -var server = require('http').createServer(app); -var orchestrator = require('./orchestrator') +var ON_DEATH = require("death")({ debug: true }); +var app = require("express")(); +var server = require("http").createServer(app); +var orchestrator = require("./orchestrator"); -orchestrator.injectMetricsRoute(app) -orchestrator.init(server) +orchestrator.injectMetricsRoute(app); +orchestrator.init(server); // healthcheck endpoint -app.get('/health', (req, res) => { - res.send('Healthy'); -}) - +app.get("/health", (req, res) => { + res.send("Healthy"); +}); server.listen(LISTENING_PORT, () => { - console.log(`Server listening on port ${LISTENING_PORT}`) + console.log(`Server listening on port ${LISTENING_PORT}`); }); -ON_DEATH( (signal, err) => { - console.log('ON_DEATH signal detected') - console.error(err) - process.exit(signal) -}) \ No newline at end of file +ON_DEATH((signal, err) => { + console.log("ON_DEATH signal detected"); + console.error(err); + process.exit(signal); +}); diff --git a/pms/app/jobPoster.js b/pms/app/jobPoster.js index 672953a..1121815 100644 --- a/pms/app/jobPoster.js +++ b/pms/app/jobPoster.js @@ -1,38 +1,37 @@ -const { v4: uuid } = require('uuid'); +const { v4: uuid } = require("uuid"); -let uniqueId = uuid() +let uniqueId = uuid(); module.exports = { - postJob : (orchestratorUrl, job, callback) => { - var socket = require('socket.io-client')(orchestratorUrl); - - socket.on('connect', () => { - console.log('JobPoster connected, announcing') - socket.emit('jobposter.announce', - { - jobPosterId: uniqueId, - host: process.env.HOSTNAME - }) - }) - - let workSent = false - socket.on('jobposter.produce', () => { - console.log('Orchestrator requesting pending work') - - if (workSent) { - console.log('Work already sent, nothing to do') - return - } - - console.log(`Sending request to orchestrator on: ${orchestratorUrl}`) - - socket.emit('jobposter.job.request', job) - - workSent = true - }) - - socket.on('jobposter.job.response', response => { - callback(response) - }) - } -} \ No newline at end of file + postJob: (orchestratorUrl, job, callback) => { + var socket = require("socket.io-client")(orchestratorUrl); + + socket.on("connect", () => { + console.log("JobPoster connected, announcing"); + socket.emit("jobposter.announce", { + jobPosterId: uniqueId, + host: process.env.HOSTNAME, + }); + }); + + let workSent = false; + socket.on("jobposter.produce", () => { + console.log("Orchestrator requesting pending work"); + + if (workSent) { + console.log("Work already sent, nothing to do"); + return; + } + + console.log(`Sending request to orchestrator on: ${orchestratorUrl}`); + + socket.emit("jobposter.job.request", job); + + workSent = true; + }); + + socket.on("jobposter.job.response", (response) => { + callback(response); + }); + }, +}; diff --git a/pms/app/transcoder.js b/pms/app/transcoder.js index b16c6b9..e379eda 100644 --- a/pms/app/transcoder.js +++ b/pms/app/transcoder.js @@ -1,136 +1,159 @@ -const ORCHESTRATOR_URL = process.env.ORCHESTRATOR_URL || 'http://localhost:3500' +const ORCHESTRATOR_URL = + process.env.ORCHESTRATOR_URL || "http://localhost:3500"; -const TRANSCODER_PATH = process.env.TRANSCODER_PATH || '/usr/lib/plexmediaserver/' -const TRANSCODER_LOCAL_NAME = process.env.TRANSCODER_LOCAL_NAME || 'originalTranscoder' -const PMS_SERVICE = process.env.PMS_SERVICE || "" -const PMS_IP = process.env.PMS_IP || "" -const PMS_PORT = process.env.PMS_PORT || "32400" +const TRANSCODER_PATH = + process.env.TRANSCODER_PATH || "/usr/lib/plexmediaserver/"; +const TRANSCODER_LOCAL_NAME = + process.env.TRANSCODER_LOCAL_NAME || "originalTranscoder"; +const PMS_SERVICE = process.env.PMS_SERVICE || ""; +const PMS_IP = process.env.PMS_IP || ""; +const PMS_PORT = process.env.PMS_PORT || "32400"; -const LOCAL_RELAY_ENABLED = process.env.LOCAL_RELAY_ENABLED || "1" -const LOCAL_RELAY_PORT = process.env.LOCAL_RELAY_PORT || "32499" +const LOCAL_RELAY_ENABLED = process.env.LOCAL_RELAY_ENABLED || "1"; +const LOCAL_RELAY_PORT = process.env.LOCAL_RELAY_PORT || "32499"; -const TRANSCODER_VERBOSE = process.env.TRANSCODER_VERBOSE || '0' +const TRANSCODER_VERBOSE = process.env.TRANSCODER_VERBOSE || "0"; // Operating mode: // local // remote // both -const TRANSCODE_OPERATING_MODE = process.env.TRANSCODE_OPERATING_MODE || 'both' -const TRANSCODE_EAE_LOCALLY = process.env.TRANSCODE_EAE_LOCALLY || false -const FORCE_HTTPS = process.env.FORCE_HTTPS || "0" +const TRANSCODE_OPERATING_MODE = process.env.TRANSCODE_OPERATING_MODE || "both"; +const TRANSCODE_EAE_LOCALLY = process.env.TRANSCODE_EAE_LOCALLY || false; +const FORCE_HTTPS = process.env.FORCE_HTTPS || "0"; // validations if (PMS_SERVICE == "" && PMS_IP == "") { - console.error("You must set either PMS_SERVICE or PMS_IP (either one), pointing to you Plex instance. PMS_SERVICE is only allowed in conjunction with LOCAL_RELAY_ENABLED='1'") - process.exit(1) + console.error( + "You must set either PMS_SERVICE or PMS_IP (either one), pointing to you Plex instance. PMS_SERVICE is only allowed in conjunction with LOCAL_RELAY_ENABLED='1'" + ); + process.exit(1); } if (PMS_SERVICE != "" && LOCAL_RELAY_ENABLED != "1") { - console.error("PMS_SERVICE is only allowed in conjunction with LOCAL_RELAY_ENABLED='1'. This is due to a high chance of Plex rejecting the traffic coming from the Worker.") - process.exit(1) + console.error( + "PMS_SERVICE is only allowed in conjunction with LOCAL_RELAY_ENABLED='1'. This is due to a high chance of Plex rejecting the traffic coming from the Worker." + ); + process.exit(1); } if (FORCE_HTTPS == "1" && LOCAL_RELAY_ENABLED == "1") { - console.warn(`When Local Relay is enabled FORCE_HTTPS is ignored as it is not needed for reporting streaming progress back to Plex.`) + console.warn( + `When Local Relay is enabled FORCE_HTTPS is ignored as it is not needed for reporting streaming progress back to Plex.` + ); } -const { spawn } = require('child_process'); -var ON_DEATH = require('death')({debug: true}) +const { spawn } = require("child_process"); +var ON_DEATH = require("death")({ debug: true }); -var jobPoster = require('./jobPoster') +var jobPoster = require("./jobPoster"); -if (TRANSCODE_OPERATING_MODE == 'local') { - transcodeLocally(process.cwd(), process.argv.slice(2), process.env) -} else if (TRANSCODE_EAE_LOCALLY && process.argv.slice(2).filter(s => s.includes('eae_prefix')).length > 0) { - console.log('EasyAudioEncoder used, forcing local transcode') - transcodeLocally(process.cwd(), process.argv.slice(2), process.env) +if (TRANSCODE_OPERATING_MODE == "local") { + transcodeLocally(process.cwd(), process.argv.slice(2), process.env); +} else if ( + TRANSCODE_EAE_LOCALLY && + process.argv.slice(2).filter((s) => s.includes("eae_prefix")).length > 0 +) { + console.log("EasyAudioEncoder used, forcing local transcode"); + transcodeLocally(process.cwd(), process.argv.slice(2), process.env); +} else if (process.cwd().indexOf("PlexCreditsDetection") > 0) { + console.log("PlexCreditsDetection so forcing local transcode"); + transcodeLocally(process.cwd(), process.argv.slice(2), process.env); } else { - function setValueOf(arr, key, newValue) { - let i = arr.indexOf(key) - if (i > 0) { - arr[i+1] = newValue - } + function setValueOf(arr, key, newValue) { + let i = arr.indexOf(key); + if (i > 0) { + arr[i + 1] = newValue; } + } - let networkProtocol = "http"; - let targetLocation = PMS_SERVICE || PMS_IP //SERVICE takes precedence over IP - let targetPort = PMS_PORT - if (LOCAL_RELAY_ENABLED == "1") { - console.log(`Local Relay enabled, traffic proxied through PMS local port ${LOCAL_RELAY_PORT}`) - targetPort = LOCAL_RELAY_PORT - } else { - if (FORCE_HTTPS == '1') { - console.log('Forcing HTTPS in progress callback'); - networkProtocol = "https"; - } + let networkProtocol = "http"; + let targetLocation = PMS_SERVICE || PMS_IP; //SERVICE takes precedence over IP + let targetPort = PMS_PORT; + if (LOCAL_RELAY_ENABLED == "1") { + console.log( + `Local Relay enabled, traffic proxied through PMS local port ${LOCAL_RELAY_PORT}` + ); + targetPort = LOCAL_RELAY_PORT; + } else { + if (FORCE_HTTPS == "1") { + console.log("Forcing HTTPS in progress callback"); + networkProtocol = "https"; } + } - let newArgs = process.argv.slice(2).map((v) => { - return v - .replace(`http://127.0.0.1:${PMS_PORT}`, `${networkProtocol}://${targetLocation}:${targetPort}`) - .replace('aac_lc', 'aac'); // workaround for error -> Unknown decoder 'aac_lc' - }) + let newArgs = process.argv.slice(2).map((v) => { + return v + .replace( + `http://127.0.0.1:${PMS_PORT}`, + `${networkProtocol}://${targetLocation}:${targetPort}` + ) + .replace("aac_lc", "aac"); // workaround for error -> Unknown decoder 'aac_lc' + }); - if (TRANSCODER_VERBOSE == '1') { - console.log('Setting VERBOSE to ON') - setValueOf(newArgs, '-loglevel', 'verbose') - setValueOf(newArgs, '-loglevel_plex', 'verbose') - } + if (TRANSCODER_VERBOSE == "1") { + console.log("Setting VERBOSE to ON"); + setValueOf(newArgs, "-loglevel", "verbose"); + setValueOf(newArgs, "-loglevel_plex", "verbose"); + } - let environmentVariables = process.env - let workDir = process.cwd() + let environmentVariables = process.env; + let workDir = process.cwd(); - console.log(`Sending request to orchestrator on: ${ORCHESTRATOR_URL}`) - if (TRANSCODER_VERBOSE == '1') { - console.log(`cwd => ${JSON.stringify(workDir)}`) - console.log(`args => ${JSON.stringify(newArgs)}`) - console.log(`env => ${JSON.stringify(environmentVariables)}`) - } - - jobPoster.postJob(ORCHESTRATOR_URL, + console.log(`Sending request to orchestrator on: ${ORCHESTRATOR_URL}`); + if (TRANSCODER_VERBOSE == "1") { + console.log(`cwd => ${JSON.stringify(workDir)}`); + console.log(`args => ${JSON.stringify(newArgs)}`); + console.log(`env => ${JSON.stringify(environmentVariables)}`); + } + + jobPoster.postJob( + ORCHESTRATOR_URL, { - type: 'transcode', - payload: { - cwd: workDir, - args: newArgs, - env: environmentVariables - } + type: "transcode", + payload: { + cwd: workDir, + args: newArgs, + env: environmentVariables, + }, }, (response) => { - if (!response.result) { - console.error('Distributed transcoder failed, calling local') - if (TRANSCODE_OPERATING_MODE == 'both') { - transcodeLocally(process.cwd(), process.argv.slice(2), process.env); - } else { - // remote only - console.error(`Error transcoding and local transcode is disabled: TRANSCODE_OPERATING_MODE=${TRANSCODE_OPERATING_MODE}`) - process.exit(1) - } + if (!response.result) { + console.error("Distributed transcoder failed, calling local"); + if (TRANSCODE_OPERATING_MODE == "both") { + transcodeLocally(process.cwd(), process.argv.slice(2), process.env); } else { - console.log("Remote Transcoding was successful") - process.exit(0) + // remote only + console.error( + `Error transcoding and local transcode is disabled: TRANSCODE_OPERATING_MODE=${TRANSCODE_OPERATING_MODE}` + ); + process.exit(1); } + } else { + console.log("Remote Transcoding was successful"); + process.exit(0); + } } - ) + ); } function transcodeLocally(cwd, args, env) { - let child = spawn(TRANSCODER_PATH + TRANSCODER_LOCAL_NAME, args, { - cwd: cwd, - env: env - }); - child.stdout.pipe(process.stdout); - child.stderr.pipe(process.stderr); - withErrors = 0; - child.on('error', (err) => { - console.error(err); - withErrors = 1; - }); - child.on('close', (code) => { - console.log('Completed local transcode'); - process.exit(withErrors); - }); + let child = spawn(TRANSCODER_PATH + TRANSCODER_LOCAL_NAME, args, { + cwd: cwd, + env: env, + }); + child.stdout.pipe(process.stdout); + child.stderr.pipe(process.stderr); + withErrors = 0; + child.on("error", (err) => { + console.error(err); + withErrors = 1; + }); + child.on("close", (code) => { + console.log("Completed local transcode"); + process.exit(withErrors); + }); } -ON_DEATH( (signal, err) => { - console.log('ON_DEATH signal detected') - console.error(err) - process.exit(signal) -}) +ON_DEATH((signal, err) => { + console.log("ON_DEATH signal detected"); + console.error(err); + process.exit(signal); +}); diff --git a/pms/extended-image/Dockerfile-development b/pms/extended-image/Dockerfile-development new file mode 100644 index 0000000..1a006c1 --- /dev/null +++ b/pms/extended-image/Dockerfile-development @@ -0,0 +1,25 @@ +FROM linuxserver/plex:latest + +LABEL maintainer="pabloromeo" + +COPY /docker-mod/root/etc/ /etc/ +COPY /app /app + +RUN apt-get update && \ + apt-get install -y dos2unix && \ + dos2unix /etc/cont-init.d/* && \ + dos2unix /etc/services.d/nginx/* && \ + dos2unix /app/transcoder-shim.sh && \ + apt-get remove -y dos2unix + +RUN bash /etc/cont-init.d/92-install-dependencies && \ + bash /etc/cont-init.d/93-npm-install && \ + rm /etc/cont-init.d/92-install-dependencies && \ + rm /etc/cont-init.d/93-npm-install && \ + echo "**** cleanup ****" && \ + rm -rf \ + /tmp/* \ + /var/lib/apt/lists/* \ + /var/tmp/* + +WORKDIR / diff --git a/worker/app/worker.js b/worker/app/worker.js index 7f4f75c..83a820c 100644 --- a/worker/app/worker.js +++ b/worker/app/worker.js @@ -1,196 +1,235 @@ -const LISTENING_PORT = process.env.LISTENING_PORT || 3501 -const STAT_CPU_INTERVAL = process.env.STAT_CPU_INTERVAL || 2000 -const STAT_CPU_OPS_DURATION = process.env.STAT_CPU_OPS_DURATION || 1000 -const ORCHESTRATOR_URL = process.env.ORCHESTRATOR_URL || 'http://localhost:3500' -const TRANSCODER_PATH = process.env.TRANSCODER_PATH || '/usr/lib/plexmediaserver/' -const TRANSCODER_NAME = process.env.TRANSCODER_NAME || 'Plex Transcoder' -const EAE_SUPPORT = process.env.EAE_SUPPORT || "1" -const EAE_EXECUTABLE = process.env.EAE_EXECUTABLE || "" +const LISTENING_PORT = process.env.LISTENING_PORT || 3501; +const STAT_CPU_INTERVAL = process.env.STAT_CPU_INTERVAL || 2000; +const STAT_CPU_OPS_DURATION = process.env.STAT_CPU_OPS_DURATION || 1000; +const ORCHESTRATOR_URL = + process.env.ORCHESTRATOR_URL || "http://localhost:3500"; +const TRANSCODER_PATH = + process.env.TRANSCODER_PATH || "/usr/lib/plexmediaserver/"; +const TRANSCODER_NAME = process.env.TRANSCODER_NAME || "Plex Transcoder"; +const EAE_SUPPORT = process.env.EAE_SUPPORT || "1"; +const EAE_EXECUTABLE = process.env.EAE_EXECUTABLE || ""; // hwaccel decoder: https://trac.ffmpeg.org/wiki/HWAccelIntro -const FFMPEG_HWACCEL = process.env.FFMPEG_HWACCEL || false +const FFMPEG_HWACCEL = process.env.FFMPEG_HWACCEL || false; // Settings debug info -console.log(`EAE_SUPPORT => ${EAE_SUPPORT}`) -console.log(`EAE_EXECUTABLE => ${EAE_EXECUTABLE}`) -console.log(`FFMPEG_HWACCEL => ${FFMPEG_HWACCEL}`) +console.log(`EAE_SUPPORT => ${EAE_SUPPORT}`); +console.log(`EAE_EXECUTABLE => ${EAE_EXECUTABLE}`); +console.log(`FFMPEG_HWACCEL => ${FFMPEG_HWACCEL}`); -var app = require('express')(); -var server = require('http').createServer(app); -var socket = require('socket.io-client')(ORCHESTRATOR_URL); -var cpuStat = require('cpu-stat'); -var fs = require('fs'); -const { spawn, exec } = require('child_process'); -const { v4: uuid } = require('uuid'); -const { fib, dist } = require('cpu-benchmark'); +var app = require("express")(); +var server = require("http").createServer(app); +var socket = require("socket.io-client")(ORCHESTRATOR_URL); +var cpuStat = require("cpu-stat"); +var fs = require("fs"); +const { spawn, exec } = require("child_process"); +const { v4: uuid } = require("uuid"); +const { fib, dist } = require("cpu-benchmark"); -var ON_DEATH = require('death')({debug: true}) +var ON_DEATH = require("death")({ debug: true }); // initialize CPU stats to a high number until it is overwritten by first sample let cpuUsage = 9999.0; // calculate CPU operations for worker stats (simple benchmark over STAT_CPU_OPS_DURATION milliseconds) -const ops = dist(STAT_CPU_OPS_DURATION) -console.log(`Computed CPU ops => ${ops}`) +const ops = dist(STAT_CPU_OPS_DURATION); +console.log(`Computed CPU ops => ${ops}`); // healthcheck endpoint -app.get('/health', (req, res) => { - res.send('Healthy'); -}) +app.get("/health", (req, res) => { + res.send("Healthy"); +}); server.listen(LISTENING_PORT, () => { - console.log(`Worker listening on port ${LISTENING_PORT}`) + console.log(`Worker listening on port ${LISTENING_PORT}`); }); - + // calculate cpu usage every 2 seconds -setInterval( () => { - cpuStat.usagePercent({ sampleMs: STAT_CPU_INTERVAL }, (err, percent, seconds) => { - if (!err) { - cpuUsage = percent.toFixed(2) - if (socket.connected) { - socket.emit('worker.stats', { cpu: cpuUsage, tasks : taskMap.size, ops: ops }) - } +setInterval(() => { + cpuStat.usagePercent( + { sampleMs: STAT_CPU_INTERVAL }, + (err, percent, seconds) => { + if (!err) { + cpuUsage = percent.toFixed(2); + if (socket.connected) { + socket.emit("worker.stats", { + cpu: cpuUsage, + tasks: taskMap.size, + ops: ops, + }); } - }); -}, STAT_CPU_INTERVAL) + } + } + ); +}, STAT_CPU_INTERVAL); -let workerId = uuid() -let taskMap = new Map() +let workerId = uuid(); +let taskMap = new Map(); -console.debug(`Initializing Worker ${workerId}|${process.env.HOSTNAME}`) +console.debug(`Initializing Worker ${workerId}|${process.env.HOSTNAME}`); -socket.on('connect', () => { - console.log(`Worker connected on socket ${socket.id}`) - socket.emit('worker.announce', - { - workerId: workerId, - host: process.env.HOSTNAME - }) -}) +socket.on("connect", () => { + console.log(`Worker connected on socket ${socket.id}`); + socket.emit("worker.announce", { + workerId: workerId, + host: process.env.HOSTNAME, + }); +}); function processEnv(env) { - // overwrite environment settings coming from the original plex instance tied to architecture - newEnv = JSON.parse(JSON.stringify(env)); - newEnv.PLEX_ARCH = process.env.PLEX_ARCH - newEnv.PLEX_MEDIA_SERVER_INFO_MODEL = process.env.PLEX_MEDIA_SERVER_INFO_MODEL - newEnv.FFMPEG_EXTERNAL_LIBS = process.env.FFMPEG_EXTERNAL_LIBS - return newEnv + // overwrite environment settings coming from the original plex instance tied to architecture + newEnv = JSON.parse(JSON.stringify(env)); + newEnv.PLEX_ARCH = process.env.PLEX_ARCH; + newEnv.PLEX_MEDIA_SERVER_INFO_MODEL = + process.env.PLEX_MEDIA_SERVER_INFO_MODEL; + newEnv.FFMPEG_EXTERNAL_LIBS = process.env.FFMPEG_EXTERNAL_LIBS; + return newEnv; } -socket.on('worker.task.request', taskRequest => { - console.log('Received task request') +socket.on("worker.task.request", (taskRequest) => { + console.log("Received task request"); + + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "received", + }); + + var processedEnvironmentVariables = processEnv(taskRequest.payload.env); + + var child, childEAE; + if (taskRequest.payload.args[0] === "testpayload") { + console.log(`args => ${JSON.stringify(taskRequest.payload.args)}`); + console.log(`env => ${JSON.stringify(processedEnvironmentVariables)}`); + console.log("Starting test of waiting for 5 seconds"); + child = exec("sleep 5"); + } else { + if (FFMPEG_HWACCEL != false) { + console.log(`Setting hwaccel to ${FFMPEG_HWACCEL}`); + let i = taskRequest.payload.args.indexOf("-hwaccel"); + if (i > 0) { + taskRequest.payload.args[i + 1] = FFMPEG_HWACCEL; + } else { + taskRequest.payload.args.unshift("-hwaccel", FFMPEG_HWACCEL); + } + } - socket.emit('worker.task.update', { - taskId: taskRequest.taskId, - status: 'received' - }) + console.log(`EAE_ROOT => "${processedEnvironmentVariables.EAE_ROOT}"`); + if ( + (EAE_SUPPORT == "1" || EAE_SUPPORT == "true") && + EAE_EXECUTABLE != "" && + processedEnvironmentVariables.EAE_ROOT?.length > 0 + ) { + if (!fs.existsSync(processedEnvironmentVariables.EAE_ROOT)) { + console.log( + `EAE Support - Creating EAE_ROOT destination => ${processedEnvironmentVariables.EAE_ROOT}` + ); + fs.mkdirSync(processedEnvironmentVariables.EAE_ROOT, { + recursive: true, + }); + } + + console.log( + `EAE Support - Spawning EasyAudioEncoder from "${EAE_EXECUTABLE}", cwd => ${processedEnvironmentVariables.EAE_ROOT}` + ); + childEAE = spawn(EAE_EXECUTABLE, [], { + cwd: processedEnvironmentVariables.EAE_ROOT, + env: processedEnvironmentVariables, + }); + childEAE.stdout.pipe(process.stdout); + childEAE.stderr.pipe(process.stderr); + childEAE.on("error", (err) => { + console.error("EAE Support - EAE failed:"); + console.error(err); + }); + childEAE.on("close", () => { + console.log("EAE Support - Closing"); + }); + childEAE.on("exit", () => { + console.log("EAE Support - Exiting"); + }); + } else { + childEAE = null; + } - var processedEnvironmentVariables = processEnv(taskRequest.payload.env) + if (!fs.existsSync(taskRequest.payload.cwd)) { + console.error( + `CWD path doesn't seem to exist. Plex should have created this path before-hand, so you may have an issue with your shares => "${taskRequest.payload.cwd}"` + ); + } - var child, childEAE - if (taskRequest.payload.args[0] === 'testpayload') { - console.log(`args => ${JSON.stringify(taskRequest.payload.args)}`) - console.log(`env => ${JSON.stringify(processedEnvironmentVariables)}`) - console.log('Starting test of waiting for 5 seconds') - child = exec('sleep 5'); - } else { - if (FFMPEG_HWACCEL != false) { - console.log(`Setting hwaccel to ${FFMPEG_HWACCEL}`) - let i = taskRequest.payload.args.indexOf('-hwaccel') - if (i > 0) { - taskRequest.payload.args[i+1] = FFMPEG_HWACCEL - } else { - taskRequest.payload.args.unshift('-hwaccel', FFMPEG_HWACCEL) - } - } + child = spawn(TRANSCODER_PATH + TRANSCODER_NAME, taskRequest.payload.args, { + cwd: taskRequest.payload.cwd, + env: processedEnvironmentVariables, + }); + } - if ((EAE_SUPPORT == "1" || EAE_SUPPORT == "true") && EAE_EXECUTABLE != "" && processedEnvironmentVariables.EAE_ROOT !== undefined) { - if (!fs.existsSync(processedEnvironmentVariables.EAE_ROOT)){ - console.log(`EAE Support - Creating EAE_ROOT destination => ${processedEnvironmentVariables.EAE_ROOT}`) - fs.mkdirSync(processedEnvironmentVariables.EAE_ROOT, { recursive: true }); - } - - console.log(`EAE Support - Spawning EasyAudioEncoder from "${EAE_EXECUTABLE}", cwd => ${processedEnvironmentVariables.EAE_ROOT}`) - childEAE = spawn(EAE_EXECUTABLE, [], { - cwd: processedEnvironmentVariables.EAE_ROOT, - env: processedEnvironmentVariables - }); - childEAE.stdout.pipe(process.stdout); - childEAE.stderr.pipe(process.stderr); - childEAE.on('error', (err) => { - console.error('EAE Support - EAE failed:') - console.error(err) - }) - childEAE.on('close', () => { - console.log('EAE Support - Closing') - }) - childEAE.on('exit', () => { - console.log('EAE Support - Exiting') - }) - } else { - childEAE = null - } + taskMap.set(taskRequest.taskId, { + transcodeProcess: child, + eaeProcess: childEAE, + }); - child = spawn(TRANSCODER_PATH + TRANSCODER_NAME, taskRequest.payload.args, { - cwd: taskRequest.payload.cwd, - env: processedEnvironmentVariables - }); + child.stdout.pipe(process.stdout); + child.stderr.pipe(process.stderr); + + let notified = false; + const completionHandler = (code) => { + if (!notified) { + console.log("Completed transcode"); + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "done", + result: code === 0, + exitCode: code, + }); + notified = true; + console.log("Removing process from taskMap"); + taskMap.delete(taskRequest.taskId); } + }; + + child.on("error", (err) => { + console.error("Transcoding failed:"); + console.error(err); + notified = true; + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "done", + result: false, + error: err.message, + }); + console.log("Orchestrator notified"); - taskMap.set(taskRequest.taskId, { transcodeProcess: child, eaeProcess: childEAE }) + console.log("Removing process from taskMap"); + taskMap.delete(taskRequest.taskId); + }); - child.stdout.pipe(process.stdout); - child.stderr.pipe(process.stderr); + child.on("close", completionHandler); + child.on("exit", completionHandler); - let notified = false - const completionHandler = (code) => { - if (!notified) { - console.log('Completed transcode') - socket.emit('worker.task.update', { taskId: taskRequest.taskId, status: 'done', result: code === 0, exitCode : code }) - notified = true - console.log('Removing process from taskMap') - taskMap.delete(taskRequest.taskId) - } - } + socket.emit("worker.task.update", { + taskId: taskRequest.taskId, + status: "inprogress", + }); +}); - child.on('error', (err) => { - console.error('Transcoding failed:') - console.error(err) - notified = true - socket.emit('worker.task.update', { taskId: taskRequest.taskId, status: 'done', result: false, error: err.message }) - console.log('Orchestrator notified') - - console.log('Removing process from taskMap') - taskMap.delete(taskRequest.taskId) - }) - - child.on('close', completionHandler) - child.on('exit', completionHandler) - - socket.emit('worker.task.update', { - taskId: taskRequest.taskId, - status: 'inprogress' - }) -}) - -socket.on('worker.task.kill', data => { - let taskEntry = taskMap.get(data.taskId) - if (taskEntry) { - console.log(`Killing child processes for task ${data.taskId}`) - taskEntry.transcodeProcess.kill() - if (taskEntry.eaeProcess != null) { - taskEntry.eaeProcess.kill() - } - console.log('Removing process from taskMap') - taskMap.delete(data.taskId) +socket.on("worker.task.kill", (data) => { + let taskEntry = taskMap.get(data.taskId); + if (taskEntry) { + console.log(`Killing child processes for task ${data.taskId}`); + taskEntry.transcodeProcess.kill(); + if (taskEntry.eaeProcess != null) { + taskEntry.eaeProcess.kill(); } -}) + console.log("Removing process from taskMap"); + taskMap.delete(data.taskId); + } +}); -socket.on('disconnect', () => { - console.log('Worker disconnected') -}) +socket.on("disconnect", () => { + console.log("Worker disconnected"); +}); -ON_DEATH( (signal, err) => { - console.log('ON_DEATH signal detected') - console.error(err) - process.exit(signal) -}) +ON_DEATH((signal, err) => { + console.log("ON_DEATH signal detected"); + console.error(err); + process.exit(signal); +}); diff --git a/worker/extended-image/Dockerfile b/worker/extended-image/Dockerfile index 594e553..0dcd34a 100644 --- a/worker/extended-image/Dockerfile +++ b/worker/extended-image/Dockerfile @@ -11,9 +11,9 @@ RUN bash /etc/cont-init.d/92-install-dependencies && \ rm /etc/cont-init.d/93-npm-install && \ echo "**** cleanup ****" && \ rm -rf \ - /tmp/* \ - /var/lib/apt/lists/* \ - /var/tmp/* + /tmp/* \ + /var/lib/apt/lists/* \ + /var/tmp/* EXPOSE 3501 diff --git a/worker/extended-image/Dockerfile-development b/worker/extended-image/Dockerfile-development new file mode 100644 index 0000000..a78e3db --- /dev/null +++ b/worker/extended-image/Dockerfile-development @@ -0,0 +1,27 @@ +FROM linuxserver/plex:latest + +LABEL maintainer="pabloromeo" + +COPY /docker-mod/root/etc/ /etc/ +COPY /app /app + +RUN apt-get update && \ + apt-get install -y dos2unix && \ + dos2unix /etc/cont-init.d/* && \ + dos2unix /app/start.sh && \ + apt-get remove -y dos2unix + +RUN bash /etc/cont-init.d/92-install-dependencies && \ + bash /etc/cont-init.d/93-npm-install && \ + rm /etc/cont-init.d/92-install-dependencies && \ + rm /etc/cont-init.d/93-npm-install && \ + echo "**** cleanup ****" && \ + rm -rf \ + /tmp/* \ + /var/lib/apt/lists/* \ + /var/tmp/* + +EXPOSE 3501 + +VOLUME /codecs +