Skip to content

Commit

Permalink
api: Implement resumable uploads for VOD (#1112)
Browse files Browse the repository at this point in the history
* api: Add tus-node-server dep

* asset: Implement tus upload to local filesystem

* Remove alternate ways of passing uploadToken

* Throw an idea for using tus metadata

* WIP: Idea on how tus could be configured from VOD OS

* [DEV-ONLY] Add tus-test project to test tus development

Remember to delete this or make it a proper unit test once
it's done.

* api: moved TUS into req, added GCSStore, fixed TaskScheduler type, added gcsOptions to schema

* removed auto import

* update with google storage as s3

* upload.start()

* upload.start()

* added tests

* Make resumable uploads work!

* api: Move setupTus logic to asset controller

Now app-router only needs to call a func

* api/asset: Upload tus files to directUpload folder

* api/asset: Disallow users from customizing OS

* tus: setup test tus

* vod: added tus resumable upload tests

* tus: added mock file instead of test file

* api/asset: Fix tusEndpoint returned during tests

* tus: remove tus-test folder - update tests

* tus: fixed tests

* fix taskScheduler type

* api/asset: Fix tests

Was missing the tus-js-client dependency and needed to fix the check on
URL format.

Co-authored-by: gioelecerati <[email protected]>
  • Loading branch information
victorges and gioelecerati authored Aug 2, 2022
1 parent 752703f commit 0f808b4
Show file tree
Hide file tree
Showing 11 changed files with 681 additions and 61 deletions.
2 changes: 2 additions & 0 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
"sql-template-strings": "^2.2.2",
"string-template": "^1.0.0",
"stripe": "^8.93.0",
"tus-node-server": "^0.6.0",
"uuid": "^3.3.2",
"whatwg-fetch": "^3.4.0",
"winston": "^3.2.1",
Expand Down Expand Up @@ -138,6 +139,7 @@
"pkg": "^5.7.0",
"prettier": "^2.0.5",
"redoc-cli": "^0.8.3",
"tus-js-client": "^3.0.0-0",
"typescript": "^4.3.4",
"wrangler": "^0.0.2"
},
Expand Down
16 changes: 9 additions & 7 deletions packages/api/src/app-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Router } from "express";
import promBundle from "express-prom-bundle";
import proxy from "http-proxy-middleware";
import Stripe from "stripe";

import makeStore from "./store";
import {
errorHandler,
Expand All @@ -20,11 +19,12 @@ import streamProxy from "./controllers/stream-proxy";
import apiProxy from "./controllers/api-proxy";
import { getBroadcasterHandler } from "./controllers/broadcaster";
import WebhookCannon from "./webhooks/cannon";
import TaskScheduler from "./task/scheduler";
import Queue, { NoopQueue, RabbitQueue } from "./store/queue";
import { CliArgs } from "./parse-cli";
import { regionsGetter } from "./controllers/region";
import { pathJoin } from "./controllers/helpers";
import taskScheduler from "./task/scheduler";
import { setupTus, setupTestTus } from "./controllers/asset";

enum OrchestratorSource {
hardcoded = "hardcoded",
Expand Down Expand Up @@ -102,25 +102,27 @@ export default async function makeApp(params: CliArgs) {
: new NoopQueue();

// Task Scheduler
const taskScheduler = new TaskScheduler({
queue,
});
await taskScheduler.start();
await taskScheduler.start({ queue });

// Webhooks Cannon
const webhookCannon = new WebhookCannon({
db,
frontendDomain,
sendgridTemplateId,
sendgridApiKey,
taskScheduler,
vodObjectStoreId,
supportAddr,
verifyUrls: true,
queue,
});
await webhookCannon.start();

if (process.env.NODE_ENV === "test") {
await setupTestTus();
} else if (vodObjectStoreId) {
await setupTus(vodObjectStoreId);
}

process.on("beforeExit", (code) => {
queue.close();
webhookCannon.stop();
Expand Down
118 changes: 117 additions & 1 deletion packages/api/src/controllers/asset.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import serverPromise, { TestServer } from "../test-server";
import { TestClient, clearDatabase, setupUsers } from "../test-helpers";
import {
TestClient,
clearDatabase,
setupUsers,
createMockFile,
} from "../test-helpers";
import { v4 as uuid } from "uuid";
import { Asset, User } from "../schema/types";
import { db } from "../store";
import { WithID } from "../store/types";
import Table from "../store/table";
import schema from "../schema/schema.json";
import fs from "fs/promises";
import * as tus from "tus-js-client";
import os from "os";
import { sleep } from "../util";

// repeat the type here so we don't need to export it from store/asset-table.ts
type DBAsset =
Expand Down Expand Up @@ -316,5 +325,112 @@ describe("controllers/asset", () => {
},
});
});

describe("chunked upload", () => {
const expectTaskStatus = async (
taskId: string,
expectedStatus: string
) => {
const res = await client.get(`/task/${taskId}`);
expect(res.status).toBe(200);
const task = await res.json();
expect(task.status.phase).toBe(expectedStatus);
};

const uploadFile = async (
filename: string,
filePath: string,
tusEndpoint: string,
shouldAbort: boolean,
resumeFrom?: number
) => {
const file = await fs.readFile(filePath);
const { size } = await fs.stat(filePath);
let uploadPercentage = await new Promise<number>(
async (resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint: tusEndpoint,
urlStorage: new (tus as any).FileUrlStorage(
`${os.tmpdir()}/metadata`
),
chunkSize: 1024 * 1024 * 1,
metadata: {
filename,
filetype: "video/mp4",
},
uploadSize: size,
onError(error) {
reject(error);
},
onProgress(bytesUploaded, bytesTotal) {
const percentage = parseFloat(
((bytesUploaded / bytesTotal) * 100).toFixed(2)
);
if (resumeFrom) {
expect(percentage).toBeGreaterThanOrEqual(resumeFrom);
}
if (shouldAbort && percentage > 1) {
upload.abort().then(() => {
resolve(percentage);
});
}
},
onSuccess() {
resolve(100);
},
});
if (resumeFrom) {
const previousUploads = await upload.findPreviousUploads();
expect(previousUploads).toHaveLength(1);
upload.resumeFromPreviousUpload(previousUploads[0]);
}
upload.start();
}
);
if (shouldAbort) {
expect(uploadPercentage).toBeGreaterThan(0);
expect(uploadPercentage).toBeLessThan(100);
} else {
expect(uploadPercentage).toBe(100);
}
return uploadPercentage;
};

it("should start upload, stop it, resume it on tus test server", async () => {
const filename = "test.mp4";
const path = os.tmpdir();
const filePath = `${path}/${filename}`;
let res = await client.post("/asset/request-upload", {
name: "tus-test",
});
expect(res.status).toBe(200);
let {
tusEndpoint,
task: { id: taskId },
} = await res.json();
expect(
tusEndpoint?.startsWith(`http://test/api/asset/upload/tus?token=`)
).toBe(true);
tusEndpoint = tusEndpoint.replace("http://test", client.server.host);

await createMockFile(filePath, 1024 * 1024 * 10);
await expectTaskStatus(taskId, "pending");
let percentage = await uploadFile(
filename,
filePath,
tusEndpoint,
true
);
await expectTaskStatus(taskId, "pending");
await uploadFile(filename, filePath, tusEndpoint, false, percentage);

await sleep(100);

await expectTaskStatus(taskId, "waiting");

await fs.unlink(filePath);
await fs.unlink(`${path}/metadata`);
});
});
});
});
Loading

0 comments on commit 0f808b4

Please sign in to comment.