diff --git a/packages/api/src/controllers/asset.ts b/packages/api/src/controllers/asset.ts index 8e5c107e54..8578168604 100644 --- a/packages/api/src/controllers/asset.ts +++ b/packages/api/src/controllers/asset.ts @@ -59,7 +59,7 @@ import mung from "express-mung"; const app = Router(); -function catalystPipelineStrategy(req: Request) { +export function catalystPipelineStrategy(req: Request) { let { catalystPipelineStrategy } = req.body as NewAssetPayload; if (!req.user.admin && !req.user.isTestUser) { catalystPipelineStrategy = undefined; @@ -79,7 +79,7 @@ function isPrivatePlaybackPolicy(playbackPolicy: PlaybackPolicy) { const secondaryStorageExperiment = "secondary-vod-storage"; -async function defaultObjectStoreId( +export async function defaultObjectStoreId( { config, body, user }: Request, isOldPipeline?: boolean ): Promise { @@ -177,7 +177,7 @@ function parseUrlToDStorageUrl( return null; } -async function validateAssetPayload( +export async function validateAssetPayload( id: string, playbackId: string, userId: string, diff --git a/packages/api/src/controllers/clip.ts b/packages/api/src/controllers/clip.ts new file mode 100644 index 0000000000..3d5e5dbea7 --- /dev/null +++ b/packages/api/src/controllers/clip.ts @@ -0,0 +1,196 @@ +import { validatePost } from "../middleware"; +import { Request, Router } from "express"; +import _ from "lodash"; +import { db } from "../store"; +import { NotFoundError } from "../store/errors"; +import { pathJoin } from "../controllers/helpers"; +import { + createAsset, + validateAssetPayload, + defaultObjectStoreId, + catalystPipelineStrategy, +} from "./asset"; +import { generateUniquePlaybackId } from "./generate-keys"; +import { v4 as uuid } from "uuid"; +import { DBSession } from "../store/session-table"; +import { fetchWithTimeout } from "../util"; +import { DBStream } from "../store/stream-table"; +import { toExternalAsset } from "./asset"; +import { toStringValues } from "./helpers"; +import mung from "express-mung"; +import { Asset } from "../schema/types"; +import { WithID } from "../store/types"; + +const app = Router(); + +app.use( + mung.jsonAsync(async function cleanWriteOnlyResponses( + data: WithID[] | WithID | { asset: WithID }, + req + ) { + const { details } = toStringValues(req.query); + const toExternalAssetFunc = (a: Asset) => + toExternalAsset(a, req.config, !!details, req.user.admin); + + if (Array.isArray(data)) { + return Promise.all(data.map(toExternalAssetFunc)); + } + if ("id" in data) { + return toExternalAssetFunc(data); + } + if ("asset" in data) { + return { + ...data, + asset: await toExternalAssetFunc(data.asset), + }; + } + return data; + }) +); + +app.post("/", validatePost("clip-payload"), async (req, res) => { + const playbackId = req.body.playbackId; + const userId = req.user.id; + + const id = uuid(); + let uPlaybackId = await generateUniquePlaybackId(id); + + const content = await db.stream.getByPlaybackId(playbackId); //|| + //(await db.asset.getByPlaybackId(playbackId)); + + let isStream: boolean; + if (content && "streamKey" in content) { + isStream = true; + } + + if (!content) { + throw new NotFoundError("Content not found"); + } + + const user = await db.user.get(content.userId); + + if (!user || userId !== content.userId) { + throw new NotFoundError("Content not found"); + } + + if ("suspended" in content && content.suspended) { + throw new NotFoundError("Content not found"); + } + + let url: string; + let session: DBSession; + let objectStoreId: string; + + if (isStream) { + if (!content.record) { + res.status(400).json({ + errors: ["Recording must be enabled on a live stream to create clips"], + }); + } + ({ url, session, objectStoreId } = await getRunningRecording(content, req)); + } else { + res + .status(400) + .json({ errors: ["Clipping for assets is not implemented yet"] }); + return; + } + + if (!session) { + throw new Error("Recording session not found"); + } + + let asset = await validateAssetPayload( + id, + uPlaybackId, + content.userId, + Date.now(), + await defaultObjectStoreId(req), + req.config, + { + name: req.body.name || `clip-${uPlaybackId}`, + }, + { + type: "clip", + ...(isStream ? { sessionId: session.id } : { assetId: content.id }), + } + ); + + asset = await createAsset(asset, req.queue); + + const task = await req.taskScheduler.createAndScheduleTask( + "clip", + { + clip: { + clipStrategy: { + playbackId, + startTime: req.body.startTime, + endTime: req.body.endTime, + }, + catalystPipelineStrategy: catalystPipelineStrategy(req), + url, + sessionId: session.id, + inputId: content.id, + sourceObjectStoreId: objectStoreId, + }, + }, + null, + asset, + userId + ); + + res.json({ + task: { id: task.id }, + asset, + }); +}); + +async function getRunningRecording(content: DBStream, req: Request) { + let objectStoreId: string; + + const session = await db.session.getLastSession(content.id); + const os = await db.objectStore.get(req.config.recordCatalystObjectStoreId); + + let url = pathJoin( + os.publicUrl, + session.playbackId, + session.id, + "output.m3u8" + ); + + let params = { + method: "HEAD", + timeout: 5 * 1000, + }; + let resp = await fetchWithTimeout(url, params); + + if (resp.status != 200) { + const secondaryOs = req.config.secondaryRecordObjectStoreId + ? await db.objectStore.get(req.config.secondaryRecordObjectStoreId) + : undefined; + url = pathJoin( + secondaryOs.publicUrl, + session.playbackId, + session.id, + "output.m3u8" + ); + /* + TODO: Enable to check if recording is running on the secondary one + resp = await fetchWithTimeout(url, params); + + if (resp.status != 200) { + throw new Error("Recording not found"); + }*/ + + objectStoreId = req.config.secondaryRecordObjectStoreId; + } else { + objectStoreId = req.config.recordCatalystObjectStoreId; + } + + return { + url, + session, + objectStoreId, + }; +} + +export default app; diff --git a/packages/api/src/controllers/index.ts b/packages/api/src/controllers/index.ts index e879e17582..8351af8ba9 100644 --- a/packages/api/src/controllers/index.ts +++ b/packages/api/src/controllers/index.ts @@ -5,6 +5,7 @@ import experiment from "./experiment"; import ingest from "./ingest"; import objectStore from "./object-store"; import accessControl from "./access-control"; +import clip from "./clip"; import multistream from "./multistream"; import orchestrator from "./orchestrator"; import stream from "./stream"; @@ -51,4 +52,5 @@ export default { playback, did, room, + clip, }; diff --git a/packages/api/src/controllers/playback.test.ts b/packages/api/src/controllers/playback.test.ts index c145c1781e..2bd8cc8dd6 100644 --- a/packages/api/src/controllers/playback.test.ts +++ b/packages/api/src/controllers/playback.test.ts @@ -4,7 +4,7 @@ import { Attestation, Experiment, Asset, User } from "../schema/types"; import { WithID } from "../store/types"; import { db } from "../store"; import { DBStream } from "../store/stream-table"; -import { DBSession } from "../store/db"; +import { DBSession } from "../store/session-table"; const EXPECTED_CROSS_USER_ASSETS_CUTOFF_DATE = Date.parse( "2023-06-06T00:00:00.000Z" diff --git a/packages/api/src/controllers/playback.ts b/packages/api/src/controllers/playback.ts index 4355baec55..84111c41c0 100644 --- a/packages/api/src/controllers/playback.ts +++ b/packages/api/src/controllers/playback.ts @@ -4,6 +4,7 @@ import { getHLSPlaybackUrl, getWebRTCPlaybackUrl, getRecordingFields, + getRecordingPlaybackUrl, } from "./stream"; import { getPlaybackUrl as assetPlaybackUrl, @@ -11,7 +12,7 @@ import { StaticPlaybackInfo, } from "./asset"; import { CliArgs } from "../parse-cli"; -import { DBSession } from "../store/db"; +import { DBSession } from "../store/session-table"; import { Asset, PlaybackInfo, Stream, User } from "../schema/types"; import { DBStream } from "../store/stream-table"; import { WithID } from "../store/types"; @@ -36,7 +37,8 @@ function newPlaybackInfo( webRtcUrl?: string | null, playbackPolicy?: Asset["playbackPolicy"] | Stream["playbackPolicy"], staticFilesPlaybackInfo?: StaticPlaybackInfo[], - live?: PlaybackInfo["meta"]["live"] + live?: PlaybackInfo["meta"]["live"], + recordingUrl?: string ): PlaybackInfo { let playbackInfo: PlaybackInfo = { type, @@ -71,6 +73,14 @@ function newPlaybackInfo( url: webRtcUrl, }); } + if (recordingUrl) { + playbackInfo.meta.dvrPlayback = []; + playbackInfo.meta.dvrPlayback.push({ + hrn: "HLS (TS)", + type: "html5/application/vnd.apple.mpegurl", + url: recordingUrl, + }); + } return playbackInfo; } @@ -183,7 +193,9 @@ async function getPlaybackInfo( ingest: string, id: string, isCrossUserQuery: boolean, - origin: string + origin: string, + withRecordings?: boolean, + recordCatalystObjectStoreId?: string ): Promise { const cutoffDate = isCrossUserQuery ? null : CROSS_USER_ASSETS_CUTOFF_DATE; let { stream, asset, session } = await getResourceByPlaybackId( @@ -206,13 +218,21 @@ async function getPlaybackInfo( } if (stream) { + let recordingPlaybackUrl: string; + if (withRecordings) { + recordingPlaybackUrl = await getRecordingPlaybackUrl( + stream, + recordCatalystObjectStoreId + ); + } return newPlaybackInfo( "live", getHLSPlaybackUrl(ingest, stream), getWebRTCPlaybackUrl(ingest, stream), stream.playbackPolicy, null, - stream.isActive ? 1 : 0 + stream.isActive ? 1 : 0, + recordingPlaybackUrl ); } @@ -242,6 +262,8 @@ app.get("/:id", async (req, res) => { const ingest = ingests[0].base; let { id } = req.params; + const withRecordings = req.query.recordings === "true"; + const origin = req.headers["origin"] ?? ""; const isEmbeddablePlayer = embeddablePlayerOrigin.test(origin); @@ -250,7 +272,9 @@ app.get("/:id", async (req, res) => { ingest, id, isEmbeddablePlayer, - origin + origin, + withRecordings, + req.config.recordCatalystObjectStoreId ); if (!info) { throw new NotFoundError(`No playback URL found for ${id}`); diff --git a/packages/api/src/controllers/session.ts b/packages/api/src/controllers/session.ts index 6b3713400b..dc7e618695 100644 --- a/packages/api/src/controllers/session.ts +++ b/packages/api/src/controllers/session.ts @@ -4,7 +4,7 @@ import sql from "sql-template-strings"; import { authorizer } from "../middleware"; import { User } from "../schema/types"; import { db } from "../store"; -import { DBSession } from "../store/db"; +import { DBSession } from "../store/session-table"; import { DBStream } from "../store/stream-table"; import { WithID } from "../store/types"; import { CliArgs } from "../parse-cli"; diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 46a1764abc..4e3f1851a9 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -20,7 +20,7 @@ import { User, } from "../schema/types"; import { db } from "../store"; -import { DBSession } from "../store/db"; +import { DBSession } from "../store/session-table"; import { BadRequestError, InternalServerError, @@ -401,6 +401,31 @@ app.get("/", authorizer({}), async (req, res) => { ); }); +export async function getRecordingPlaybackUrl( + stream: DBStream, + objectStoreId: string +) { + let url: string; + + try { + const session = await db.session.getLastSession(stream.id); + + if (!session) { + return null; + } + + const os = await db.objectStore.get(objectStoreId); + url = pathJoin(os.publicUrl, session.playbackId, session.id, "output.m3u8"); + } catch (e) { + console.log(` + Error getting recording playback url: ${e} + `); + return null; + } + + return url; +} + export async function getRecordingFields( config: CliArgs, ingest: string, diff --git a/packages/api/src/schema/api-schema.yaml b/packages/api/src/schema/api-schema.yaml index 52e9ed5981..b1c4fe9918 100644 --- a/packages/api/src/schema/api-schema.yaml +++ b/packages/api/src/schema/api-schema.yaml @@ -798,8 +798,20 @@ components: type: string enum: - directUpload + - clip encryption: $ref: "#/components/schemas/new-asset-payload/properties/encryption" + sourceId: + type: string + description: + ID of the asset or stream from which this asset was created + sessionId: + type: string + description: + ID of the session from which this asset was created + assetId: + type: string + description: ID of the asset from which this asset was created creatorId: $ref: "#/components/schemas/creator-id" storage: @@ -1380,6 +1392,7 @@ components: - export - export-data - transcode-file + - clip createdAt: readOnly: true type: number @@ -1497,6 +1510,49 @@ components: be creatorId: $ref: "#/components/schemas/input-creator-id" + clip: + properties: + url: + type: string + description: URL of the asset to "clip" + clipStrategy: + type: object + description: >- + Strategy to use for clipping the asset. If not specified, + the default strategy that Catalyst is configured for will be + used. This field only available for admin users, and is only + used for E2E testing. + additionalProperties: false + properties: + startTime: + type: number + description: Start time of the clip in milliseconds + endTime: + type: number + description: End time of the clip in milliseconds + playbackId: + type: string + description: Playback ID of the stream or asset to clip + catalystPipelineStrategy: + type: string + description: >- + Force to use a specific strategy in the Catalyst pipeline. + If not specified, the default strategy that Catalyst is + configured for will be used. This field only available for + admin users, and is only used for E2E testing. + enum: + - catalyst + - catalyst_ffmpeg + - background_external + - background_mist + - fallback_external + - external + sessionId: + type: string + description: ID of the session + inputId: + type: string + description: ID of the input asset or stream status: readOnly: true type: object @@ -2049,6 +2105,30 @@ components: bitrate: type: number example: 449890 + dvrPlayback: + type: array + items: + type: object + additionalProperties: false + required: + - hrn + - type + - url + properties: + hrn: + type: string + example: MP4 + enum: + - HLS (TS) + type: + type: string + example: html5/video/mp4 + enum: + - html5/application/vnd.apple.mpegurl + url: + type: string + example: >- + https://asset-cdn.lp-playback.monster/hls/1bde4o2i6xycudoy/static360p0.mp4 attestation: $ref: "#/components/schemas/attestation" attestation: diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index 8657fc65ae..be8b9de5c1 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -21,6 +21,25 @@ components: accessKey: type: string description: Access key used for access-control verification + clip-payload: + type: object + additionalProperties: false + required: + - playbackId + - startTime + properties: + playbackId: + type: string + description: Playback ID of the stream or asset to clip + startTime: + type: number + description: Start time of the clip in milliseconds + endTime: + type: number + description: End time of the clip in milliseconds + name: + type: string + description: Name of the clip experiment-audience-payload: type: object additionalProperties: false @@ -949,6 +968,40 @@ components: - background_mist - fallback_external - external + clip: + properties: + clipStrategy: + type: object + description: >- + Strategy to use for clipping the asset. If not specified, + the default strategy that Catalyst is configured for will be + used. This field only available for admin users, and is only + used for E2E testing. + additionalProperties: false + properties: + startTime: + type: number + description: Start time of the clip in milliseconds + endTime: + type: number + description: End time of the clip in milliseconds + playbackId: + type: string + description: Playback ID of the stream or asset to clip + catalystPipelineStrategy: + type: string + description: >- + Force to use a specific strategy in the Catalyst pipeline. + If not specified, the default strategy that Catalyst is + configured for will be used. This field only available for + admin users, and is only used for E2E testing. + enum: + - catalyst + - catalyst_ffmpeg + - background_external + - background_mist + - fallback_external + - external transcode-file: properties: catalystPipelineStrategy: diff --git a/packages/api/src/store/db.ts b/packages/api/src/store/db.ts index 3a5cb7f1ca..ba61661c84 100644 --- a/packages/api/src/store/db.ts +++ b/packages/api/src/store/db.ts @@ -19,10 +19,7 @@ import { Attestation, } from "../schema/types"; import BaseTable, { TableOptions } from "./table"; -import StreamTable, { - DeprecatedStreamFields, - StreamStats, -} from "./stream-table"; +import StreamTable from "./stream-table"; import { kebabToCamel } from "../util"; import { QueryOptions, WithID } from "./types"; import MultistreamTargetTable from "./multistream-table"; @@ -31,6 +28,7 @@ import AssetTable from "./asset-table"; import TaskTable from "./task-table"; import ExperimentTable from "./experiment-table"; import AttestationTable from "./attestation-table"; +import SessionTable, { DBSession } from "./session-table"; // Should be configurable, perhaps? export const CONNECT_TIMEOUT = @@ -42,8 +40,6 @@ export interface PostgresParams { appName?: string; } -export type DBSession = WithID & StreamStats & DeprecatedStreamFields; - type Table = BaseTable>; type QueryHistogramLabels = { @@ -78,7 +74,7 @@ export class DB { webhookResponse: Table; passwordResetToken: Table; region: Table; - session: Table; + session: SessionTable; room: Table; postgresUrl: string; @@ -184,7 +180,7 @@ export class DB { db: this, schema: schemas["webhook-response"], }); - this.session = makeTable({ db: this, schema: schemas["session"] }); + this.session = new SessionTable({ db: this, schema: schemas["session"] }); this.room = makeTable({ db: this, schema: schemas["room"] }); const tables = Object.entries(schema.components.schemas).filter( diff --git a/packages/api/src/store/session-table.ts b/packages/api/src/store/session-table.ts new file mode 100644 index 0000000000..65571d21ad --- /dev/null +++ b/packages/api/src/store/session-table.ts @@ -0,0 +1,20 @@ +import sql from "sql-template-strings"; +import Table from "./table"; +import { QueryResult } from "pg"; +import { DBLegacyObject, QueryOptions, WithID } from "./types"; +import { Session } from "../schema/types"; +import { DeprecatedStreamFields, StreamStats } from "./stream-table"; + +export type DBSession = WithID & StreamStats & DeprecatedStreamFields; + +export default class SessionTable extends Table { + async getLastSession(parentId: string, opts?: QueryOptions) { + const res: QueryResult = await this.db.queryWithOpts( + sql`SELECT * FROM session WHERE data->>'parentId'=${parentId} ORDER BY data->>'createdAt' DESC LIMIT 1`.setName( + `${this.name}_last_by_parentid` + ), + opts + ); + return res.rowCount < 1 ? null : (res.rows[0].data as DBSession); + } +} diff --git a/packages/api/src/store/stream-table.ts b/packages/api/src/store/stream-table.ts index 12d9fd2fce..0189ea86c1 100644 --- a/packages/api/src/store/stream-table.ts +++ b/packages/api/src/store/stream-table.ts @@ -296,7 +296,7 @@ export default class StreamTable extends Table { async getLastSession(id: string, opts?: QueryOptions): Promise { const res: QueryResult = await this.db.queryWithOpts( - sql`SELECT data FROM stream WHERE data->>'parentId'=${id} ORDER BY data->'createdAt' DESC LIMIT 1`.setName( + sql`SELECT data FROM stream WHERE data->>'parentId'=${id} ORDER BY data->>'createdAt' DESC LIMIT 1`.setName( `${this.name}_by_parentid_last_session` ), opts diff --git a/packages/api/src/task/scheduler.ts b/packages/api/src/task/scheduler.ts index 8fce9b979d..983f5286fd 100644 --- a/packages/api/src/task/scheduler.ts +++ b/packages/api/src/task/scheduler.ts @@ -129,6 +129,7 @@ export class TaskScheduler { let assetSpec: Asset; switch (task.type) { + case "clip": case "upload": assetSpec = event.output?.[task.type]?.assetSpec; if (!assetSpec) { diff --git a/packages/api/src/webhooks/cannon.ts b/packages/api/src/webhooks/cannon.ts index 1920d13c24..ebe2e1170d 100644 --- a/packages/api/src/webhooks/cannon.ts +++ b/packages/api/src/webhooks/cannon.ts @@ -4,7 +4,8 @@ import isLocalIP from "is-local-ip"; import { Response } from "node-fetch"; import { v4 as uuid } from "uuid"; import { parse as parseUrl } from "url"; -import { DB, DBSession } from "../store/db"; +import { DB } from "../store/db"; +import { DBSession } from "../store/session-table"; import messages from "../store/messages"; import Queue from "../store/queue"; import { DBWebhook } from "../store/webhook-table";