diff --git a/packages/api/src/controllers/asset.ts b/packages/api/src/controllers/asset.ts index 162a7fd798..5d824043c5 100644 --- a/packages/api/src/controllers/asset.ts +++ b/packages/api/src/controllers/asset.ts @@ -609,6 +609,7 @@ const fieldsMap = { createdAt: { val: `asset.data->'createdAt'`, type: "int" }, updatedAt: { val: `asset.data->'status'->'updatedAt'`, type: "int" }, userId: `asset.data->>'userId'`, + creatorId: `stream.data->'creatorId'->>'value'`, playbackId: `asset.data->>'playbackId'`, playbackRecordingId: `asset.data->>'playbackRecordingId'`, phase: `asset.data->'status'->>'phase'`, diff --git a/packages/api/src/controllers/stream.test.ts b/packages/api/src/controllers/stream.test.ts index ebddeae128..d0b7248096 100644 --- a/packages/api/src/controllers/stream.test.ts +++ b/packages/api/src/controllers/stream.test.ts @@ -33,6 +33,7 @@ let mockUser: User; let mockAdminUser: User; let mockNonAdminUser: User; let postMockStream: Stream; +let postMockPullStream: Stream; // jest.setTimeout(70000) beforeAll(async () => { @@ -59,6 +60,12 @@ beforeAll(async () => { renditions: ["random_prefix_bbb_160p"], }, ]; + postMockPullStream = { + ...postMockStream, + pull: { + source: "https://playback.space/video+7bbb3wee.flv", + }, + }; mockUser = { email: `mock_user@gmail.com`, @@ -478,6 +485,144 @@ describe("controllers/stream", () => { }); }); + describe("pull stream idempotent creation", () => { + beforeEach(async () => { + // TODO: Remove this once experiment is done + await client.post("/experiment", { + name: "stream-pull-source", + audienceUserIds: [adminUser.id], + }); + }); + + it("should require a pull configuration", async () => { + let res = await client.put("/stream/pull", postMockStream); + expect(res.status).toBe(400); + const errors = await res.json(); + expect(errors).toMatchObject({ + errors: [ + expect.stringContaining("stream pull configuration is required"), + ], + }); + + res = await client.put("/stream/pull", { + ...postMockStream, + pull: {}, // an empty object is missing the 'source' field + }); + expect(res.status).toBe(422); + }); + + it("should create a stream if a pull config is present", async () => { + const now = Date.now(); + const res = await client.put("/stream/pull", postMockPullStream); + expect(res.status).toBe(201); + const stream = await res.json(); + expect(stream.id).toBeDefined(); + expect(stream.kind).toBe("stream"); + expect(stream.name).toBe("test_stream"); + expect(stream.createdAt).toBeGreaterThanOrEqual(now); + const document = await db.stream.get(stream.id); + expect(server.db.stream.addDefaultFields(document)).toEqual(stream); + }); + + it("should update a stream if it has the same pull source", async () => { + let res = await client.put("/stream/pull", postMockPullStream); + expect(res.status).toBe(201); + const stream = await res.json(); + + const now = Date.now(); + res = await client.put("/stream/pull", { + ...postMockPullStream, + name: "updated_stream", + profiles: [], + }); + expect(res.status).toBe(200); + const updatedStream = await res.json(); + expect(updatedStream.id).toBe(stream.id); + expect(updatedStream.name).toBe("updated_stream"); + expect(updatedStream.profiles).toEqual([]); + + const document = await db.stream.get(stream.id); + expect(db.stream.addDefaultFields(document)).toEqual(updatedStream); + }); + + it("should fail to dedup streams by a random key", async () => { + let res = await client.put( + "/stream/pull?key=invalid", + postMockPullStream + ); + expect(res.status).toBe(400); + const errors = await res.json(); + expect(errors).toMatchObject({ + errors: [expect.stringContaining("key must be one of")], + }); + }); + + it("should fail to dedup streams by creatorId if not provided", async () => { + let res = await client.put( + "/stream/pull?key=creatorId", + postMockPullStream + ); + expect(res.status).toBe(400); + const errors = await res.json(); + expect(errors).toMatchObject({ + errors: [expect.stringContaining("must be present in the payload")], + }); + }); + + it("should dedup streams by creatorId if requested", async () => { + let res = await client.put("/stream/pull?key=creatorId", { + ...postMockPullStream, + creatorId: "0xjest", + }); + expect(res.status).toBe(201); + const stream = await res.json(); + + res = await client.put("/stream/pull", { + ...postMockPullStream, + creatorId: "0xjest", + name: "updated_stream", + profiles: [], + }); + expect(res.status).toBe(200); + const updatedStream = await res.json(); + expect(updatedStream.id).toBe(stream.id); + expect(updatedStream.name).toBe("updated_stream"); + expect(updatedStream.profiles).toEqual([]); + + const document = await db.stream.get(stream.id); + expect(db.stream.addDefaultFields(document)).toEqual(updatedStream); + }); + + it("should wait for stream to become active if requested", async () => { + let responded = false; + const resProm = client.put( + "/stream/pull?waitActive=true", + postMockPullStream + ); + resProm.then(() => (responded = true)); + + // give some time for API to create object in DB + await sleep(100); + + const [streams] = await db.stream.find(); + expect(streams).toHaveLength(1); + expect(streams[0].isActive).toBe(false); + + // stream not active yet + expect(responded).toBe(false); + + // set stream active + await db.stream.update(streams[0].id, { isActive: true }); + + const res = await resProm; + expect(responded).toBe(true); // make sure this works + expect(res.status).toBe(201); + const stream = await res.json(); + expect(stream.id).toBe(streams[0].id); + expect(stream.isActive).toBe(true); + }); + }); + it("should create a stream, delete it, and error when attempting additional delete or replace", async () => { const res = await client.post("/stream", { ...postMockStream }); expect(res.status).toBe(201); diff --git a/packages/api/src/controllers/stream.ts b/packages/api/src/controllers/stream.ts index 13a5c9828b..99ed909ad1 100644 --- a/packages/api/src/controllers/stream.ts +++ b/packages/api/src/controllers/stream.ts @@ -1,5 +1,4 @@ import { Router, Request } from "express"; -import fetch from "node-fetch"; import { QueryResult } from "pg"; import sql from "sql-template-strings"; import { parse as parseUrl } from "url"; @@ -54,6 +53,7 @@ import { withPlaybackUrls } from "./asset"; import { getClips } from "./clip"; import { ensureExperimentSubject } from "../store/experiment-table"; import { experimentSubjectsOnly } from "./experiment"; +import { sleep } from "../util"; type Profile = DBStream["profiles"][number]; type MultistreamOptions = DBStream["multistream"]; @@ -62,6 +62,31 @@ type MultistreamTargetRef = MultistreamOptions["targets"][number]; export const USER_SESSION_TIMEOUT = 60 * 1000; // 1 min const ACTIVE_TIMEOUT = 90 * 1000; // 90 sec const STALE_SESSION_TIMEOUT = 3 * 60 * 60 * 1000; // 3 hours +const MAX_WAIT_STREAM_ACTIVE = 2 * 60 * 1000; // 2 min + +// Helper constant to be used in the PUT /pull API to make sure we delete fields +// from the stream that are not specified in the PUT payload. +const EMPTY_NEW_STREAM_PAYLOAD: Required< + Omit< + NewStreamPayload & { creatorId: undefined }, + // omit all the db-schema fields + | "wowza" + | "presets" + | "renditions" + | "recordObjectStoreId" + | "objectStoreId" + | "detection" + > +> = { + name: undefined, + profiles: undefined, + multistream: undefined, + pull: undefined, + record: undefined, + userTags: undefined, + creatorId: undefined, + playbackPolicy: undefined, +}; const app = Router(); const hackMistSettings = (req: Request, profiles: Profile[]): Profile[] => { @@ -209,6 +234,35 @@ async function triggerManyIdleStreamsWebhook(ids: string[], queue: Queue) { ); } +// Waits for the stream to become active. Works by polling the stream object +// on the DB until it becomes active or the timeout is reached. Will wait for +// exponentially longer time in between polls to reduce load on the DB. +async function pollWaitStreamActive(req: Request, id: string) { + let clientGone = false; + req.on("close", () => { + clientGone = true; + }); + + const deadline = Date.now() + MAX_WAIT_STREAM_ACTIVE; + let sleepDelay = 500; + while (!clientGone && Date.now() < deadline) { + const stream = + (await db.stream.get(id)) || + (await db.stream.get(id, { useReplica: false })); // read from primary in case replica is lagging + if (!stream) { + throw new NotFoundError("stream not found"); + } + if (stream.isActive) { + return stream; + } + + await sleep(sleepDelay); + sleepDelay = Math.min(sleepDelay * 2, 5000); + } + + throw new InternalServerError("stream not active"); +} + export function getHLSPlaybackUrl(ingest: string, stream: DBStream) { return pathJoin(ingest, `hls`, stream.playbackId, `index.m3u8`); } @@ -297,6 +351,8 @@ const fieldsMap: FieldsMap = { lastSeen: { val: `stream.data->'lastSeen'`, type: "int" }, createdAt: { val: `stream.data->'createdAt'`, type: "int" }, userId: `stream.data->>'userId'`, + creatorId: `stream.data->'creatorId'->>'value'`, + "pull.source": `stream.data->'pull'->>'source'`, isActive: { val: `stream.data->'isActive'`, type: "boolean" }, "user.email": { val: `users.data->>'email'`, type: "full-text" }, parentId: `stream.data->>'parentId'`, @@ -953,6 +1009,94 @@ app.post( } ); +const pullStreamKeyAccessors: Record = { + creatorId: ["creatorId", "value"], + "pull.source": ["pull", "source"], +}; + +app.put( + "/pull", + authorizer({}), + validatePost("new-stream-payload"), + experimentSubjectsOnly("stream-pull-source"), + async (req, res) => { + const { key = "pull.source", waitActive } = toStringValues(req.query); + const rawPayload = req.body as NewStreamPayload; + + if (!rawPayload.pull) { + return res.status(400).json({ + errors: [`stream pull configuration is required`], + }); + } + + // Make the payload compatible with the stream schema to simplify things + const payload: Partial = { + profiles: req.config.defaultStreamProfiles, + ...rawPayload, + creatorId: mapInputCreatorId(rawPayload.creatorId), + }; + + const keyValue = _.get(payload, pullStreamKeyAccessors[key]); + if (!keyValue) { + return res.status(400).json({ + errors: [ + `key must be one of ${Object.keys( + pullStreamKeyAccessors + )} and must be present in the payload`, + ], + }); + } + const filtersStr = encodeURIComponent( + JSON.stringify([{ id: key, value: keyValue }]) + ); + const filters = parseFilters(fieldsMap, filtersStr); + + const [streams] = await db.stream.find( + [ + sql`data->>'userId' = ${req.user.id}`, + sql`data->>'deleted' IS NULL`, + ...filters, + ], + { useReplica: false } + ); + if (streams.length > 1) { + return res.status(400).json({ + errors: [ + `pull.source must be unique, found ${streams.length} streams with same source`, + ], + }); + } + const streamExisted = streams.length === 1; + + let stream: DBStream; + if (!streamExisted) { + stream = await handleCreateStream(req); + } else { + stream = { + ...streams[0], + ...EMPTY_NEW_STREAM_PAYLOAD, // clear all fields that should be set from the payload + ...payload, + }; + await db.stream.replace(stream); + // read from DB again to keep exactly what got saved + stream = await db.stream.get(stream.id, { useReplica: false }); + } + + await TODOtriggerCatalystPullStart(stream); + + if (waitActive === "true") { + stream = await pollWaitStreamActive(req, stream.id); + } + + res.status(streamExisted ? 200 : 201); + res.json( + db.stream.addDefaultFields( + db.stream.removePrivateFields(stream, req.user.admin) + ) + ); + } +); + app.post( "/", authorizer({}), @@ -961,6 +1105,7 @@ app.post( const { autoStartPull } = toStringValues(req.query); const payload = req.body as NewStreamPayload; + // TODO: Remove autoStartPull once experiment subjects migrate to /pull if (autoStartPull || payload.pull) { await ensureExperimentSubject("stream-pull-source", req.user.id); } @@ -973,7 +1118,11 @@ app.post( } const [streams] = await db.stream.find( - [sql`data->'pull'->>'source' = ${payload.pull.source}`], + [ + sql`data->>'userId' = ${req.user.id}`, + sql`data->>'deleted' IS NULL`, + sql`data->'pull'->>'source' = ${payload.pull.source}`, + ], { useReplica: false } ); @@ -997,72 +1146,78 @@ app.post( } } - const id = uuid(); - const createdAt = Date.now(); - // TODO: Don't create a streamKey if there's a pull source (here and on www) - const streamKey = await generateUniqueStreamKey(id); - let playbackId = await generateUniquePlaybackId(id, [streamKey]); - if (req.user.isTestUser) { - playbackId += "-test"; - } - - const { objectStoreId } = payload; - if (objectStoreId) { - const store = await db.objectStore.get(objectStoreId); - if (!store || store.deleted || store.disabled) { - return res.status(400).json({ - errors: [`object store ${objectStoreId} not found or disabled`], - }); - } - } - - let doc: DBStream = { - profiles: req.config.defaultStreamProfiles, - ...payload, - kind: "stream", - userId: req.user.id, - creatorId: mapInputCreatorId(payload.creatorId), - renditions: {}, - objectStoreId, - id, - createdAt, - streamKey, - playbackId, - createdByTokenName: req.token?.name, - createdByTokenId: req.token?.id, - isActive: false, - lastSeen: 0, - }; - doc = wowzaHydrate(doc); - - await validateStreamPlaybackPolicy(doc.playbackPolicy, req.user.id); - - doc.profiles = hackMistSettings(req, doc.profiles); - doc.multistream = await validateMultistreamOpts( - req.user.id, - doc.profiles, - doc.multistream - ); - - if (doc.userTags) { - await validateTags(doc.userTags); - } - - await db.stream.create(doc); + const stream = await handleCreateStream(req); if (autoStartPull === "true") { - await TODOtriggerCatalystPullStart(doc); + await TODOtriggerCatalystPullStart(stream); } res.status(201); res.json( db.stream.addDefaultFields( - db.stream.removePrivateFields(doc, req.user.admin) + db.stream.removePrivateFields(stream, req.user.admin) ) ); } ); +async function handleCreateStream(req: Request) { + const payload = req.body as NewStreamPayload; + + const id = uuid(); + const createdAt = Date.now(); + // TODO: Don't create a streamKey if there's a pull source (here and on www) + const streamKey = await generateUniqueStreamKey(id); + let playbackId = await generateUniquePlaybackId(id, [streamKey]); + if (req.user.isTestUser) { + playbackId += "-test"; + } + + const { objectStoreId } = payload; + if (objectStoreId) { + const store = await db.objectStore.get(objectStoreId); + if (!store || store.deleted || store.disabled) { + throw new BadRequestError( + `object store ${objectStoreId} not found or disabled` + ); + } + } + + let doc: DBStream = { + profiles: req.config.defaultStreamProfiles, + ...payload, + kind: "stream", + userId: req.user.id, + creatorId: mapInputCreatorId(payload.creatorId), + renditions: {}, + objectStoreId, + id, + createdAt, + streamKey, + playbackId, + createdByTokenName: req.token?.name, + createdByTokenId: req.token?.id, + isActive: false, + lastSeen: 0, + }; + doc = wowzaHydrate(doc); + + await validateStreamPlaybackPolicy(doc.playbackPolicy, req.user.id); + + doc.profiles = hackMistSettings(req, doc.profiles); + doc.multistream = await validateMultistreamOpts( + req.user.id, + doc.profiles, + doc.multistream + ); + + if (doc.userTags) { + await validateTags(doc.userTags); + } + + return await db.stream.create(doc); +} + // Refreshes the 'lastSeen' field of a stream app.post("/:id/heartbeat", authorizer({ anyAdmin: true }), async (req, res) => { const { id } = req.params; diff --git a/packages/api/src/schema/api-schema.yaml b/packages/api/src/schema/api-schema.yaml index 3b576d35eb..253cce0c50 100644 --- a/packages/api/src/schema/api-schema.yaml +++ b/packages/api/src/schema/api-schema.yaml @@ -446,6 +446,9 @@ components: Approximate location of the pull source. The location is used to determine the closest Livepeer region to pull the stream from. additionalProperties: false + required: + - lat + - lon properties: lat: type: number diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index 6ec50f478e..dab28b40a4 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -798,6 +798,11 @@ components: $ref: "#/components/schemas/stream/properties/objectStoreId" detection: $ref: "#/components/schemas/stream/properties/detection" + creator-id: + oneOf: + - properties: + value: + index: true playback-policy: properties: type: diff --git a/packages/api/src/store/asset-table.test.ts b/packages/api/src/store/asset-table.test.ts index b363af0f70..34095de354 100644 --- a/packages/api/src/store/asset-table.test.ts +++ b/packages/api/src/store/asset-table.test.ts @@ -12,6 +12,7 @@ describe("assets table", () => { ); const indexes = res.rows?.map((r: any) => r.indexname).sort(); expect(indexes).toEqual([ + "asset_creatorId_value", "asset_id", "asset_playbackId", "asset_playbackRecordingId",