Skip to content

Commit

Permalink
api: Pull ingest for streams (#2019)
Browse files Browse the repository at this point in the history
* api/schema: Add new fields to stream obj

* api: Implement API handler for pull features

* api: Reminder TODO

* api: Make sure only experiment subjects can use

* api: Remove stream key omission  for now

* api: Implement missing start-pull API

* api/schema: Add docs for /start-pull

* api/db: Disable recursive indexes only for tasks

* api: Remove index TODO (just done!)
  • Loading branch information
victorges authored Jan 29, 2024
1 parent ee02db1 commit 7643af2
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 4 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/controllers/experiment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const toUserIds = (emailsOrIds?: string[]) =>

const app = Router();

const experimentSubjectsOnly =
export const experimentSubjectsOnly =
(experiment: string) => async (req, res, next) => {
await ensureExperimentSubject(experiment, req.user?.id);
return next();
Expand Down
5 changes: 5 additions & 0 deletions packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import base64url from "base64url";
import { CreatorId, InputCreatorId, ObjectStore, User } from "../schema/types";
import { BadRequestError } from "../store/errors";
import * as nativeCrypto from "crypto";
import { DBStream } from "../store/stream-table";

const ITERATIONS = 10000;
const PAYMENT_FAILED_TIMEFRAME = 3 * 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -636,6 +637,10 @@ export function isValidBase64(str: string) {
}
}

export const TODOtriggerCatalystPullStart = async (stream: DBStream) => {
// TODO: trigger pull start on catalyst
};

export const triggerCatalystStreamNuke = (req: Request, playback_id: string) =>
triggerCatalystEvent(req, { resource: "nuke", playback_id });

Expand Down
71 changes: 71 additions & 0 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import {
mapInputCreatorId,
triggerCatalystStreamUpdated,
triggerCatalystStreamNuke,
TODOtriggerCatalystPullStart,
} from "./helpers";
import wowzaHydrate from "./wowza-hydrate";
import Queue from "../store/queue";
import { toExternalSession } from "./session";
import { withPlaybackUrls } from "./asset";
import { getClips } from "./clip";
import { ensureExperimentSubject } from "../store/experiment-table";
import { experimentSubjectsOnly } from "./experiment";

type Profile = DBStream["profiles"][number];
type MultistreamOptions = DBStream["multistream"];
Expand Down Expand Up @@ -955,10 +958,48 @@ app.post(
authorizer({}),
validatePost("new-stream-payload"),
async (req, res) => {
const { autoStartPull } = toStringValues(req.query);
const payload = req.body as NewStreamPayload;

if (autoStartPull || payload.pull) {
await ensureExperimentSubject(req.user.id, "stream-pull-source");
}

if (autoStartPull === "true") {
if (!payload.pull) {
return res.status(400).json({
errors: [`autoStartPull requires pull configuration to be present`],
});
}

const [streams] = await db.stream.find(
[sql`data->'pull'->>'source' = ${payload.pull.source}`],
{ useReplica: false }
);

if (streams.length === 1) {
const stream = streams[0];
await TODOtriggerCatalystPullStart(stream);

return res
.status(200)
.json(
db.stream.addDefaultFields(
db.stream.removePrivateFields(stream, req.user.admin)
)
);
} else if (streams.length > 1) {
return res.status(400).json({
errors: [
`autoStartPull requires pull.source to be unique, found ${streams.length} streams with same source`,
],
});
}
}

const id = uuid();
const createdAt = Date.now();
// TODO: Don't create a streamKey if there's a pull source (here and on www)
const streamKey = await generateUniqueStreamKey(id);
let playbackId = await generateUniquePlaybackId(id, [streamKey]);
if (req.user.isTestUser) {
Expand Down Expand Up @@ -1009,6 +1050,10 @@ app.post(

await db.stream.create(doc);

if (autoStartPull === "true") {
await TODOtriggerCatalystPullStart(doc);
}

res.status(201);
res.json(
db.stream.addDefaultFields(
Expand Down Expand Up @@ -1678,6 +1723,32 @@ app.patch("/:id/suspended", authorizer({}), async (req, res) => {
res.end();
});

app.post(
"/:id/start-pull",
authorizer({}),
experimentSubjectsOnly("stream-pull-source"),
async (req, res) => {
const { id } = req.params;
const stream = await db.stream.get(id);
if (
!stream ||
(!req.user.admin && (stream.deleted || stream.userId !== req.user.id))
) {
res.status(404);
return res.json({ errors: ["not found"] });
}

if (!stream.pull) {
res.status(400);
return res.json({ errors: ["stream does not have a pull source"] });
}

await TODOtriggerCatalystPullStart(stream);

res.status(204).end();
}
);

app.delete("/:id/terminate", authorizer({}), async (req, res) => {
const { id } = req.params;
const stream = await db.stream.get(id);
Expand Down
75 changes: 73 additions & 2 deletions packages/api/src/schema/api-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,48 @@ components:
type: string
example: hgebdhhigq
description: Used to form RTMP ingest URL
pull:
type: object
description: |-
Configuration for a stream that should be actively pulled from an
external source, rather than pushed to Livepeer. If specified, the
stream will not have a streamKey.
additionalProperties: false
required:
- source
properties:
source:
type: string
description: |-
URL from which to pull from.
example: https://myservice.com/live/stream.flv
headers:
type: object
description: |-
Headers to be sent with the request to the pull source.
additionalProperties:
type: string
example:
Authorization: "Bearer 123"
location:
type: object
description: |-
Approximate location of the pull source. The location is used to
determine the closest Livepeer region to pull the stream from.
additionalProperties: false
properties:
lat:
type: number
description: |-
Latitude of the pull source in degrees. North is positive,
south is negative.
example: 39.739
lon:
type: number
description: |-
Longitude of the pull source in degrees. East is positive,
west is negative.
example: -104.988
playbackId:
type: string
example: eaw4nk06ts2d0mzb
Expand Down Expand Up @@ -469,6 +511,8 @@ components:
properties:
name:
$ref: "#/components/schemas/stream/properties/name"
pull:
$ref: "#/components/schemas/stream/properties/pull"
creatorId:
$ref: "#/components/schemas/input-creator-id"
playbackPolicy:
Expand Down Expand Up @@ -2536,6 +2580,33 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/error"
"/stream/{id}/start-pull":
parameters:
- name: id
description: ID of the stream
in: path
required: true
schema:
type: string
post:
summary: Start ingest for a pull stream
description: |
`POST /stream/{id}/start-pull` can be used to start ingest for a stream
configured with a pull source. If the stream has recording configured,
it will also start recording.
\
\
A 204 No Content status response indicates the stream was successfully
started.
responses:
"204":
description: Success
default:
description: Error
content:
application/json:
schema:
$ref: "#/components/schemas/error"
/multistream/target:
get:
summary: Retrieve Multistream Targets
Expand Down Expand Up @@ -2785,7 +2856,7 @@ paths:
tusEndpoint field of the response to upload the video file and track the
progress:
```
```
# This assumes there is an `input` element of `type="file"` with id
`fileInput` in the HTML
Expand Down Expand Up @@ -3828,7 +3899,7 @@ paths:
description: >
`POST /transcode` transcodes a video file and uploads the results to the
specified storage service.
specified storage service.
\
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/schema/db-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,10 @@ components:
index: true
streamKey:
unique: true
pull:
properties:
source:
index: true
playbackId:
unique: true
mistHost:
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/store/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ export default class Table<T extends DBObject> {
}

if (!prop.index && !prop.unique) {
if (prop.properties && this.name === "asset") {
// Tasks embed a bunch of `asset` objects in different fields. This would
// mean we'd duplicate the asset indexes in the task table. Because of
// that, we disable recursive indexes for the `task` table.
if (prop.properties && this.name !== "task") {
const childProps = Object.entries(prop.properties);
for (const [childName, childProp] of childProps) {
await this.ensureIndex(childName, childProp, [...parents, propName]);
Expand Down

0 comments on commit 7643af2

Please sign in to comment.