Skip to content

Commit

Permalink
clip: define clip api (#1892)
Browse files Browse the repository at this point in the history
* clip: define clip api

* changes to the clip task spawn

* change source location & check if stream is recording & fix session query

* added session id

* return recordings

* fix recordings url

* clip: secondary os

* fix

* fix

* fix

* clip: set objectstore id for output clips

* clip: use default object store for output asset

* addressed comments

* addressed comments

* api: Create separate session table

* clip: added source object store id to task context

* rebase

* clip: clean output asset json

---------

Co-authored-by: Victor Elias <[email protected]>
  • Loading branch information
gioelecerati and victorges authored Sep 27, 2023
1 parent 0f6a125 commit 2d05e86
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 21 deletions.
6 changes: 3 additions & 3 deletions packages/api/src/controllers/asset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import mung from "express-mung";

const app = Router();

function catalystPipelineStrategy(req: Request) {
export function catalystPipelineStrategy(req: Request) {
let { catalystPipelineStrategy } = req.body as NewAssetPayload;
if (!req.user.admin && !req.user.isTestUser) {
catalystPipelineStrategy = undefined;
Expand All @@ -79,7 +79,7 @@ function isPrivatePlaybackPolicy(playbackPolicy: PlaybackPolicy) {

const secondaryStorageExperiment = "secondary-vod-storage";

async function defaultObjectStoreId(
export async function defaultObjectStoreId(
{ config, body, user }: Request,
isOldPipeline?: boolean
): Promise<string> {
Expand Down Expand Up @@ -177,7 +177,7 @@ function parseUrlToDStorageUrl(
return null;
}

async function validateAssetPayload(
export async function validateAssetPayload(
id: string,
playbackId: string,
userId: string,
Expand Down
196 changes: 196 additions & 0 deletions packages/api/src/controllers/clip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { validatePost } from "../middleware";
import { Request, Router } from "express";
import _ from "lodash";
import { db } from "../store";
import { NotFoundError } from "../store/errors";
import { pathJoin } from "../controllers/helpers";
import {
createAsset,
validateAssetPayload,
defaultObjectStoreId,
catalystPipelineStrategy,
} from "./asset";
import { generateUniquePlaybackId } from "./generate-keys";
import { v4 as uuid } from "uuid";
import { DBSession } from "../store/session-table";
import { fetchWithTimeout } from "../util";
import { DBStream } from "../store/stream-table";
import { toExternalAsset } from "./asset";
import { toStringValues } from "./helpers";
import mung from "express-mung";
import { Asset } from "../schema/types";
import { WithID } from "../store/types";

const app = Router();

app.use(
mung.jsonAsync(async function cleanWriteOnlyResponses(
data: WithID<Asset>[] | WithID<Asset> | { asset: WithID<Asset> },
req
) {
const { details } = toStringValues(req.query);
const toExternalAssetFunc = (a: Asset) =>
toExternalAsset(a, req.config, !!details, req.user.admin);

if (Array.isArray(data)) {
return Promise.all(data.map(toExternalAssetFunc));
}
if ("id" in data) {
return toExternalAssetFunc(data);
}
if ("asset" in data) {
return {
...data,
asset: await toExternalAssetFunc(data.asset),
};
}
return data;
})
);

app.post("/", validatePost("clip-payload"), async (req, res) => {
const playbackId = req.body.playbackId;
const userId = req.user.id;

const id = uuid();
let uPlaybackId = await generateUniquePlaybackId(id);

const content = await db.stream.getByPlaybackId(playbackId); //||
//(await db.asset.getByPlaybackId(playbackId));

let isStream: boolean;
if (content && "streamKey" in content) {
isStream = true;
}

if (!content) {
throw new NotFoundError("Content not found");
}

const user = await db.user.get(content.userId);

if (!user || userId !== content.userId) {
throw new NotFoundError("Content not found");
}

if ("suspended" in content && content.suspended) {
throw new NotFoundError("Content not found");
}

let url: string;
let session: DBSession;
let objectStoreId: string;

if (isStream) {
if (!content.record) {
res.status(400).json({
errors: ["Recording must be enabled on a live stream to create clips"],
});
}
({ url, session, objectStoreId } = await getRunningRecording(content, req));
} else {
res
.status(400)
.json({ errors: ["Clipping for assets is not implemented yet"] });
return;
}

if (!session) {
throw new Error("Recording session not found");
}

let asset = await validateAssetPayload(
id,
uPlaybackId,
content.userId,
Date.now(),
await defaultObjectStoreId(req),
req.config,
{
name: req.body.name || `clip-${uPlaybackId}`,
},
{
type: "clip",
...(isStream ? { sessionId: session.id } : { assetId: content.id }),
}
);

asset = await createAsset(asset, req.queue);

const task = await req.taskScheduler.createAndScheduleTask(
"clip",
{
clip: {
clipStrategy: {
playbackId,
startTime: req.body.startTime,
endTime: req.body.endTime,
},
catalystPipelineStrategy: catalystPipelineStrategy(req),
url,
sessionId: session.id,
inputId: content.id,
sourceObjectStoreId: objectStoreId,
},
},
null,
asset,
userId
);

res.json({
task: { id: task.id },
asset,
});
});

async function getRunningRecording(content: DBStream, req: Request) {
let objectStoreId: string;

const session = await db.session.getLastSession(content.id);
const os = await db.objectStore.get(req.config.recordCatalystObjectStoreId);

let url = pathJoin(
os.publicUrl,
session.playbackId,
session.id,
"output.m3u8"
);

let params = {
method: "HEAD",
timeout: 5 * 1000,
};
let resp = await fetchWithTimeout(url, params);

if (resp.status != 200) {
const secondaryOs = req.config.secondaryRecordObjectStoreId
? await db.objectStore.get(req.config.secondaryRecordObjectStoreId)
: undefined;
url = pathJoin(
secondaryOs.publicUrl,
session.playbackId,
session.id,
"output.m3u8"
);
/*
TODO: Enable to check if recording is running on the secondary one
resp = await fetchWithTimeout(url, params);
if (resp.status != 200) {
throw new Error("Recording not found");
}*/

objectStoreId = req.config.secondaryRecordObjectStoreId;
} else {
objectStoreId = req.config.recordCatalystObjectStoreId;
}

return {
url,
session,
objectStoreId,
};
}

export default app;
2 changes: 2 additions & 0 deletions packages/api/src/controllers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import experiment from "./experiment";
import ingest from "./ingest";
import objectStore from "./object-store";
import accessControl from "./access-control";
import clip from "./clip";
import multistream from "./multistream";
import orchestrator from "./orchestrator";
import stream from "./stream";
Expand Down Expand Up @@ -51,4 +52,5 @@ export default {
playback,
did,
room,
clip,
};
2 changes: 1 addition & 1 deletion packages/api/src/controllers/playback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Attestation, Experiment, Asset, User } from "../schema/types";
import { WithID } from "../store/types";
import { db } from "../store";
import { DBStream } from "../store/stream-table";
import { DBSession } from "../store/db";
import { DBSession } from "../store/session-table";

const EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE = Date.parse(
"2023-06-06T00:00:00.000Z"
Expand Down
34 changes: 29 additions & 5 deletions packages/api/src/controllers/playback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import {
getHLSPlaybackUrl,
getWebRTCPlaybackUrl,
getRecordingFields,
getRecordingPlaybackUrl,
} from "./stream";
import {
getPlaybackUrl as assetPlaybackUrl,
getStaticPlaybackInfo,
StaticPlaybackInfo,
} from "./asset";
import { CliArgs } from "../parse-cli";
import { DBSession } from "../store/db";
import { DBSession } from "../store/session-table";
import { Asset, PlaybackInfo, Stream, User } from "../schema/types";
import { DBStream } from "../store/stream-table";
import { WithID } from "../store/types";
Expand All @@ -36,7 +37,8 @@ function newPlaybackInfo(
webRtcUrl?: string | null,
playbackPolicy?: Asset["playbackPolicy"] | Stream["playbackPolicy"],
staticFilesPlaybackInfo?: StaticPlaybackInfo[],
live?: PlaybackInfo["meta"]["live"]
live?: PlaybackInfo["meta"]["live"],
recordingUrl?: string
): PlaybackInfo {
let playbackInfo: PlaybackInfo = {
type,
Expand Down Expand Up @@ -71,6 +73,14 @@ function newPlaybackInfo(
url: webRtcUrl,
});
}
if (recordingUrl) {
playbackInfo.meta.dvrPlayback = [];
playbackInfo.meta.dvrPlayback.push({
hrn: "HLS (TS)",
type: "html5/application/vnd.apple.mpegurl",
url: recordingUrl,
});
}
return playbackInfo;
}

Expand Down Expand Up @@ -183,7 +193,9 @@ async function getPlaybackInfo(
ingest: string,
id: string,
isCrossUserQuery: boolean,
origin: string
origin: string,
withRecordings?: boolean,
recordCatalystObjectStoreId?: string
): Promise<PlaybackInfo> {
const cutoffDate = isCrossUserQuery ? null : CROSS_USER_ASSETS_CUTOFF_DATE;
let { stream, asset, session } = await getResourceByPlaybackId(
Expand All @@ -206,13 +218,21 @@ async function getPlaybackInfo(
}

if (stream) {
let recordingPlaybackUrl: string;
if (withRecordings) {
recordingPlaybackUrl = await getRecordingPlaybackUrl(
stream,
recordCatalystObjectStoreId
);
}
return newPlaybackInfo(
"live",
getHLSPlaybackUrl(ingest, stream),
getWebRTCPlaybackUrl(ingest, stream),
stream.playbackPolicy,
null,
stream.isActive ? 1 : 0
stream.isActive ? 1 : 0,
recordingPlaybackUrl
);
}

Expand Down Expand Up @@ -242,6 +262,8 @@ app.get("/:id", async (req, res) => {
const ingest = ingests[0].base;

let { id } = req.params;
const withRecordings = req.query.recordings === "true";

const origin = req.headers["origin"] ?? "";
const isEmbeddablePlayer = embeddablePlayerOrigin.test(origin);

Expand All @@ -250,7 +272,9 @@ app.get("/:id", async (req, res) => {
ingest,
id,
isEmbeddablePlayer,
origin
origin,
withRecordings,
req.config.recordCatalystObjectStoreId
);
if (!info) {
throw new NotFoundError(`No playback URL found for ${id}`);
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/controllers/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sql from "sql-template-strings";
import { authorizer } from "../middleware";
import { User } from "../schema/types";
import { db } from "../store";
import { DBSession } from "../store/db";
import { DBSession } from "../store/session-table";
import { DBStream } from "../store/stream-table";
import { WithID } from "../store/types";
import { CliArgs } from "../parse-cli";
Expand Down
27 changes: 26 additions & 1 deletion packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
User,
} from "../schema/types";
import { db } from "../store";
import { DBSession } from "../store/db";
import { DBSession } from "../store/session-table";
import {
BadRequestError,
InternalServerError,
Expand Down Expand Up @@ -401,6 +401,31 @@ app.get("/", authorizer({}), async (req, res) => {
);
});

export async function getRecordingPlaybackUrl(
stream: DBStream,
objectStoreId: string
) {
let url: string;

try {
const session = await db.session.getLastSession(stream.id);

if (!session) {
return null;
}

const os = await db.objectStore.get(objectStoreId);
url = pathJoin(os.publicUrl, session.playbackId, session.id, "output.m3u8");
} catch (e) {
console.log(`
Error getting recording playback url: ${e}
`);
return null;
}

return url;
}

export async function getRecordingFields(
config: CliArgs,
ingest: string,
Expand Down
Loading

0 comments on commit 2d05e86

Please sign in to comment.