Skip to content

Commit

Permalink
Runner can choose job type
Browse files Browse the repository at this point in the history
  • Loading branch information
Chocobozzz committed Jun 27, 2024
1 parent 29ccc0c commit e654342
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 18 deletions.
26 changes: 24 additions & 2 deletions apps/peertube-runner/src/peertube-runner.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#!/usr/bin/env node

import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
import { RunnerJobType } from '@peertube/peertube-models'
import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { RunnerServer } from './server/index.js'
import { getSupportedJobsList } from './server/shared/supported-job.js'
import { ConfigManager, logger } from './shared/index.js'

const program = new Command()
Expand All @@ -25,9 +27,29 @@ const program = new Command()

program.command('server')
.description('Run in server mode, to execute remote jobs of registered PeerTube instances')
.action(async () => {
.option(
'--enable-job <type>',
'Enable this job type (multiple --enable-job options can be specified). ' +
'By default all supported jobs are enabled). ' +
'Supported job types: ' + getSupportedJobsList().join(', '),
(value: RunnerJobType, previous: RunnerJobType[]) => [ ...previous, value ],
[]
)
.action(async options => {
try {
await RunnerServer.Instance.run()
let enabledJobs: Set<RunnerJobType>

if (options.enableJob) {
for (const jobType of options.enableJob) {
if (getSupportedJobsList().includes(jobType) !== true) {
throw new InvalidArgumentError(`${jobType} is not a supported job`)
}

enabledJobs = new Set(options.enableJob)
}
}

await new RunnerServer(enabledJobs).run()
} catch (err) {
logger.error(err, 'Cannot run PeerTube runner as server mode')
process.exit(-1)
Expand Down
20 changes: 10 additions & 10 deletions apps/peertube-runner/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { readdir } from 'fs/promises'
import { join } from 'path'
import { io, Socket } from 'socket.io-client'
import { pick, shuffle, wait } from '@peertube/peertube-core-utils'
import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models'
import { PeerTubeProblemDocument, RunnerJobType, ServerErrorCode } from '@peertube/peertube-models'
import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands'
import { ConfigManager } from '../shared/index.js'
import { IPCServer } from '../shared/ipc/index.js'
import { logger } from '../shared/logger.js'
import { JobWithToken, processJob } from './process/index.js'
import { isJobSupported } from './shared/index.js'
import { getSupportedJobsList, isJobSupported } from './shared/index.js'

type PeerTubeServer = PeerTubeServerCommand & {
runnerToken: string
Expand All @@ -18,8 +18,6 @@ type PeerTubeServer = PeerTubeServerCommand & {
}

export class RunnerServer {
private static instance: RunnerServer

private servers: PeerTubeServer[] = []
private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []

Expand All @@ -30,11 +28,17 @@ export class RunnerServer {

private readonly sockets = new Map<PeerTubeServer, Socket>()

private constructor () {}
constructor (private readonly enabledJobs?: Set<RunnerJobType>) {}

async run () {
logger.info('Running PeerTube runner in server mode')

const enabledJobsArray = this.enabledJobs
? Array.from(this.enabledJobs)
: getSupportedJobsList()

logger.info('Supported and enabled job types: ' + enabledJobsArray.join(', '))

await ConfigManager.Instance.load()

for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
Expand Down Expand Up @@ -235,7 +239,7 @@ export class RunnerServer {

const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })

const filtered = availableJobs.filter(j => isJobSupported(j))
const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs))

if (filtered.length === 0) {
logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
Expand Down Expand Up @@ -315,8 +319,4 @@ export class RunnerServer {

process.exit()
}

static get Instance () {
return this.instance || (this.instance = new this())
}
}
11 changes: 7 additions & 4 deletions apps/peertube-runner/src/server/shared/supported-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ const supportedMatrix: { [ id in RunnerJobType ]: (payload: RunnerJobPayload) =>
}
}

export function isJobSupported (job: {
type: RunnerJobType
payload: RunnerJobPayload
}) {
export function isJobSupported (job: { type: RunnerJobType, payload: RunnerJobPayload }, enabledJobs?: Set<RunnerJobType>) {
if (enabledJobs && !enabledJobs.has(job.type)) return false

const fn = supportedMatrix[job.type]
if (!fn) return false

return fn(job.payload as any)
}

export function getSupportedJobsList () {
return Object.keys(supportedMatrix)
}
29 changes: 28 additions & 1 deletion packages/tests/src/peertube-runner/video-transcription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import { checkPeerTubeRunnerCacheIsEmpty } from '@tests/shared/directories.js'
import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js'
import { checkAutoCaption, checkLanguage, checkNoCaption, uploadForTranscription } from '@tests/shared/transcription.js'
import { expect } from 'chai'

describe('Test transcription in peertube-runner program', function () {
let servers: PeerTubeServer[] = []
Expand All @@ -34,7 +35,7 @@ describe('Test transcription in peertube-runner program', function () {
const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken()

peertubeRunner = new PeerTubeRunnerProcess(servers[0])
await peertubeRunner.runServer()
await peertubeRunner.runServer({ jobType: 'video-transcription' })
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' })
})

Expand Down Expand Up @@ -71,6 +72,32 @@ describe('Test transcription in peertube-runner program', function () {
})
})

describe('When transcription is not enabled in runner', function () {

before(async function () {
await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' })
peertubeRunner.kill()
await wait(500)

const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken()
await peertubeRunner.runServer({ jobType: 'live-rtmp-hls-transcoding' })
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' })
})

it('Should not run transcription', async function () {
this.timeout(60000)

const uuid = await uploadForTranscription(servers[0])
await wait(2000)

const { data } = await servers[0].runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] })
expect(data.some(j => j.type === 'video-transcription')).to.be.true

await checkNoCaption(servers, uuid)
await checkLanguage(servers, uuid, null)
})
})

describe('Check cleanup', function () {

it('Should have an empty cache directory', async function () {
Expand Down
13 changes: 12 additions & 1 deletion packages/tests/src/shared/peertube-runner-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { execaNode } from 'execa'
import { join } from 'path'
import { root } from '@peertube/peertube-node-utils'
import { PeerTubeServer } from '@peertube/peertube-server-commands'
import { RunnerJobType } from '../../../models/src/runners/runner-job-type.type.js'

export class PeerTubeRunnerProcess {
private app?: ChildProcess
Expand All @@ -12,13 +13,19 @@ export class PeerTubeRunnerProcess {
}

runServer (options: {
jobType?: RunnerJobType
hideLogs?: boolean // default true
} = {}) {
const { hideLogs = true } = options
const { jobType, hideLogs = true } = options

return new Promise<void>((res, rej) => {
const args = [ 'server', '--verbose', ...this.buildIdArg() ]

if (jobType) {
args.push('--enable-job')
args.push(jobType)
}

const forkOptions: ForkOptions = {
detached: false,
silent: true,
Expand All @@ -27,6 +34,10 @@ export class PeerTubeRunnerProcess {

this.app = fork(this.getRunnerPath(), args, forkOptions)

this.app.stderr.on('data', data => {
console.error(data.toString())
})

this.app.stdout.on('data', data => {
const str = data.toString() as string

Expand Down

0 comments on commit e654342

Please sign in to comment.