diff --git a/packages/api/src/controllers/experiment.ts b/packages/api/src/controllers/experiment.ts index b0550afd60..942007f311 100644 --- a/packages/api/src/controllers/experiment.ts +++ b/packages/api/src/controllers/experiment.ts @@ -36,7 +36,7 @@ const toUserIds = (emailsOrIds?: string[]) => const app = Router(); -const experimentSubjectsOnly = +export const experimentSubjectsOnly = (experiment: string) => async (req, res, next) => { await ensureExperimentSubject(experiment, req.user?.id); return next(); diff --git a/packages/api/src/controllers/helpers.ts b/packages/api/src/controllers/helpers.ts index 4eca7717b0..2a5ca1a8ec 100644 --- a/packages/api/src/controllers/helpers.ts +++ b/packages/api/src/controllers/helpers.ts @@ -13,6 +13,7 @@ import base64url from "base64url"; import { CreatorId, InputCreatorId, ObjectStore, User } from "../schema/types"; import { BadRequestError } from "../store/errors"; import * as nativeCrypto from "crypto"; +import { DBStream } from "../store/stream-table"; const ITERATIONS = 10000; const PAYMENT_FAILED_TIMEFRAME = 3 * 24 * 60 * 60 * 1000; @@ -636,6 +637,10 @@ export function isValidBase64(str: string) { } } +export const TODOtriggerCatalystPullStart = async (stream: DBStream) => { + // TODO: trigger pull start on catalyst +}; + export const triggerCatalystStreamNuke = (req: Request, playback_id: string) => triggerCatalystEvent(req, { resource: "nuke", playback_id }); diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 21c6d02fd3..7e904747db 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -45,12 +45,15 @@ import { mapInputCreatorId, triggerCatalystStreamUpdated, triggerCatalystStreamNuke, + TODOtriggerCatalystPullStart, } from "./helpers"; import wowzaHydrate from "./wowza-hydrate"; import Queue from "../store/queue"; import { toExternalSession } from "./session"; import { withPlaybackUrls } from "./asset"; import { getClips } from "./clip"; +import { ensureExperimentSubject } from "../store/experiment-table"; +import { experimentSubjectsOnly } from "./experiment"; type Profile = DBStream["profiles"][number]; type MultistreamOptions = DBStream["multistream"]; @@ -955,10 +958,48 @@ app.post( authorizer({}), validatePost("new-stream-payload"), async (req, res) => { + const { autoStartPull } = toStringValues(req.query); const payload = req.body as NewStreamPayload; + if (autoStartPull || payload.pull) { + await ensureExperimentSubject(req.user.id, "stream-pull-source"); + } + + if (autoStartPull === "true") { + if (!payload.pull) { + return res.status(400).json({ + errors: [`autoStartPull requires pull configuration to be present`], + }); + } + + const [streams] = await db.stream.find( + [sql`data->'pull'->>'source' = ${payload.pull.source}`], + { useReplica: false } + ); + + if (streams.length === 1) { + const stream = streams[0]; + await TODOtriggerCatalystPullStart(stream); + + return res + .status(200) + .json( + db.stream.addDefaultFields( + db.stream.removePrivateFields(stream, req.user.admin) + ) + ); + } else if (streams.length > 1) { + return res.status(400).json({ + errors: [ + `autoStartPull requires pull.source to be unique, found ${streams.length} streams with same source`, + ], + }); + } + } + const id = uuid(); const createdAt = Date.now(); + // TODO: Don't create a streamKey if there's a pull source (here and on www) const streamKey = await generateUniqueStreamKey(id); let playbackId = await generateUniquePlaybackId(id, [streamKey]); if (req.user.isTestUser) { @@ -1009,6 +1050,10 @@ app.post( await db.stream.create(doc); + if (autoStartPull === "true") { + await TODOtriggerCatalystPullStart(doc); + } + res.status(201); res.json( db.stream.addDefaultFields( @@ -1678,6 +1723,32 @@ app.patch("/:id/suspended", authorizer({}), async (req, res) => { res.end(); }); +app.post( + "/:id/start-pull", + authorizer({}), + experimentSubjectsOnly("stream-pull-source"), + async (req, res) => { + const { id } = req.params; + const stream = await db.stream.get(id); + if ( + !stream || + (!req.user.admin && (stream.deleted || stream.userId !== req.user.id)) + ) { + res.status(404); + return res.json({ errors: ["not found"] }); + } + + if (!stream.pull) { + res.status(400); + return res.json({ errors: ["stream does not have a pull source"] }); + } + + await TODOtriggerCatalystPullStart(stream); + + res.status(204).end(); + } +); + app.delete("/:id/terminate", authorizer({}), async (req, res) => { const { id } = req.params; const stream = await db.stream.get(id); diff --git a/packages/api/src/schema/api-schema.yaml b/packages/api/src/schema/api-schema.yaml index 9244c86da0..42036d3fdf 100644 --- a/packages/api/src/schema/api-schema.yaml +++ b/packages/api/src/schema/api-schema.yaml @@ -407,6 +407,48 @@ components: type: string example: hgebdhhigq description: Used to form RTMP ingest URL + pull: + type: object + description: |- + Configuration for a stream that should be actively pulled from an + external source, rather than pushed to Livepeer. If specified, the + stream will not have a streamKey. + additionalProperties: false + required: + - source + properties: + source: + type: string + description: |- + URL from which to pull from. + example: https://myservice.com/live/stream.flv + headers: + type: object + description: |- + Headers to be sent with the request to the pull source. + additionalProperties: + type: string + example: + Authorization: "Bearer 123" + location: + type: object + description: |- + Approximate location of the pull source. The location is used to + determine the closest Livepeer region to pull the stream from. + additionalProperties: false + properties: + lat: + type: number + description: |- + Latitude of the pull source in degrees. North is positive, + south is negative. + example: 39.739 + lon: + type: number + description: |- + Longitude of the pull source in degrees. East is positive, + west is negative. + example: -104.988 playbackId: type: string example: eaw4nk06ts2d0mzb @@ -469,6 +511,8 @@ components: properties: name: $ref: "#/components/schemas/stream/properties/name" + pull: + $ref: "#/components/schemas/stream/properties/pull" creatorId: $ref: "#/components/schemas/input-creator-id" playbackPolicy: @@ -2536,6 +2580,33 @@ paths: application/json: schema: $ref: "#/components/schemas/error" + "/stream/{id}/start-pull": + parameters: + - name: id + description: ID of the stream + in: path + required: true + schema: + type: string + post: + summary: Start ingest for a pull stream + description: | + `POST /stream/{id}/start-pull` can be used to start ingest for a stream + configured with a pull source. If the stream has recording configured, + it will also start recording. + \ + \ + A 204 No Content status response indicates the stream was successfully + started. + responses: + "204": + description: Success + default: + description: Error + content: + application/json: + schema: + $ref: "#/components/schemas/error" /multistream/target: get: summary: Retrieve Multistream Targets @@ -2785,7 +2856,7 @@ paths: tusEndpoint field of the response to upload the video file and track the progress: - ``` + ``` # This assumes there is an `input` element of `type="file"` with id `fileInput` in the HTML @@ -3828,7 +3899,7 @@ paths: description: > `POST /transcode` transcodes a video file and uploads the results to the - specified storage service. + specified storage service. \ diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index c8d02b6e45..60a6bd538e 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -661,6 +661,10 @@ components: index: true streamKey: unique: true + pull: + properties: + source: + index: true playbackId: unique: true mistHost: diff --git a/packages/api/src/store/table.ts b/packages/api/src/store/table.ts index 06fceb91b8..a8403c62aa 100644 --- a/packages/api/src/store/table.ts +++ b/packages/api/src/store/table.ts @@ -388,7 +388,10 @@ export default class Table { } if (!prop.index && !prop.unique) { - if (prop.properties && this.name === "asset") { + // Tasks embed a bunch of `asset` objects in different fields. This would + // mean we'd duplicate the asset indexes in the task table. Because of + // that, we disable recursive indexes for the `task` table. + if (prop.properties && this.name !== "task") { const childProps = Object.entries(prop.properties); for (const [childName, childProp] of childProps) { await this.ensureIndex(childName, childProp, [...parents, propName]);