diff --git a/jobrunner/src/jobs/LoadAssetJob.ts b/jobrunner/src/jobs/LoadAssetJob.ts index 9d0349b9d..c358eda27 100644 --- a/jobrunner/src/jobs/LoadAssetJob.ts +++ b/jobrunner/src/jobs/LoadAssetJob.ts @@ -83,6 +83,7 @@ export class LoadAssetJob extends MediaJobCommon { }, }, }); + await this._cleanupSourceFile(params); } catch (e) { await this.db.asset.update({ where: { diff --git a/jobrunner/src/jobs/MediaJobCommon.ts b/jobrunner/src/jobs/MediaJobCommon.ts index dfc89d65d..dd01c093a 100644 --- a/jobrunner/src/jobs/MediaJobCommon.ts +++ b/jobrunner/src/jobs/MediaJobCommon.ts @@ -23,12 +23,6 @@ export abstract class MediaJobCommon extends AbstractJob { filePath, ); invariant(await dl.wait(), "download did not succeed"); - - await got.delete(process.env.TUS_ENDPOINT + "/" + params.source, { - headers: { - "Tus-Resumable": "1.0.0", - }, - }); break; } //fallthrough @@ -103,4 +97,21 @@ export abstract class MediaJobCommon extends AbstractJob { await upload.done(); return s3Path; } + + protected async _cleanupSourceFile(params: PrismaJson.JobPayload) { + invariant("sourceType" in params, "sourceType is required"); + switch (params.sourceType) { + case "Tus": { + await got.delete(process.env.TUS_ENDPOINT + "/" + params.source, { + headers: { + "Tus-Resumable": "1.0.0", + }, + }); + break; + } + case "S3": + // Nothing to do here, it will already be at the expected location + break; + } + } } diff --git a/jobrunner/src/jobs/ProcessMediaJob.integration.test.ts b/jobrunner/src/jobs/ProcessMediaJob.integration.test.ts index 1937a3d26..646680e2a 100644 --- a/jobrunner/src/jobs/ProcessMediaJob.integration.test.ts +++ b/jobrunner/src/jobs/ProcessMediaJob.integration.test.ts @@ -1,4 +1,4 @@ -import { MediaState } from "@badger/prisma/client"; +import { JobState, MediaState } from "@badger/prisma/client"; import { it, expect } from "vitest"; import { doOneJob } from "../index.js"; import { integrate } from "@badger/testing"; @@ -28,7 +28,7 @@ async function uploadTestFileToTus() { throw new Error("Tus rejected creation"); } - const uploadReq = await got.stream.patch(createRes.headers.location!, { + const uploadReq = got.stream.patch(createRes.headers.location!, { body: sourceFile, headers: { "Tus-Resumable": "1.0.0", @@ -104,4 +104,49 @@ integrate("ProcessMediaJob", () => { ); expect(tusRes.statusCode).not.toBe(200); }); + + it("handles failure", async () => { + const testMediaPath = await uploadTestFileToTus(); + const media = await db.media.create({ + data: { + name: "__FAIL__smpte_bars_15s.mp4", + durationSeconds: 0, + rawPath: "", + continuityItems: { + create: { + name: "Test", + durationSeconds: 0, + order: 1, + show: { + create: { + name: "Test", + start: new Date(), + }, + }, + }, + }, + }, + }); + const job = await db.baseJob.create({ + data: { + jobType: "ProcessMediaJob", + jobPayload: { + mediaId: media.id, + sourceType: "Tus", + source: testMediaPath, + }, + }, + }); + await doOneJob(); + await expect( + db.baseJob.findFirst({ where: { id: job.id } }), + ).resolves.toHaveProperty("state", JobState.Failed); + // Check the file is not deleted from Tus + const res = await got.head(process.env.TUS_ENDPOINT + "/" + testMediaPath, { + headers: { + "Tus-Resumable": "1.0.0", + }, + }); + expect(res.statusCode).toBe(200); + }); }); diff --git a/jobrunner/src/jobs/ProcessMediaJob.ts b/jobrunner/src/jobs/ProcessMediaJob.ts index d1a3e4168..06c905f41 100644 --- a/jobrunner/src/jobs/ProcessMediaJob.ts +++ b/jobrunner/src/jobs/ProcessMediaJob.ts @@ -78,6 +78,13 @@ export default class ProcessMediaJob extends MediaJobCommon { }); try { + // Test only: allow testing failure handling + if (media.name.includes("__FAIL__")) { + throw new Error( + "Failing job to test error handling (I sure do hope this is a test...)", + ); + } + const rawTempPath = await this._wrapTask( media, "Downloading source file", @@ -291,6 +298,7 @@ export default class ProcessMediaJob extends MediaJobCommon { }, }, }); + await this._cleanupSourceFile(params); } catch (e) { await this.db.media.update({ where: { diff --git a/server/next.config.js b/server/next.config.js index 66a0192b0..a170c387d 100644 --- a/server/next.config.js +++ b/server/next.config.js @@ -1,5 +1,5 @@ // @ts-check -/* eslint-disable @typescript-eslint/no-var-requires */ +/* eslint-disable @typescript-eslint/no-require-imports */ const { PrismaPlugin } = require("@prisma/nextjs-monorepo-workaround-plugin"); const { withSentryConfig } = require("@sentry/nextjs"); const { execFileSync } = require("child_process");