Skip to content

Commit

Permalink
api: Implement MVP of stream pull trigger (#2038)
Browse files Browse the repository at this point in the history
* api: Simplify fetchWithTimeoutAndRedirects

No need to duplicate the timeout logic.

* api: Implement MVP of stream pull trigger

* api: Increase max redirects

* api: Fix comment

* api: Add some sleep til Brooklyn

* api: Make the pull trigger noop in tests

No time for testing anymore, but ideally we would.

* api: Handle HLS as well as status erros
  • Loading branch information
victorges authored Feb 2, 2024
1 parent ff15533 commit 6913261
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 52 deletions.
40 changes: 37 additions & 3 deletions packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import { CreatorId, InputCreatorId, ObjectStore, User } from "../schema/types";
import { BadRequestError } from "../store/errors";
import * as nativeCrypto from "crypto";
import { DBStream } from "../store/stream-table";
import { fetchWithTimeoutAndRedirects, sleep } from "../util";
import logger from "../logger";

const ITERATIONS = 10000;
const PAYMENT_FAILED_TIMEFRAME = 3 * 24 * 60 * 60 * 1000;
const PULL_START_TIMEOUT = 60 * 1000;

const crypto = new Crypto();

Expand Down Expand Up @@ -637,9 +640,40 @@ export function isValidBase64(str: string) {
}
}

export const TODOtriggerCatalystPullStart = async (stream: DBStream) => {
// TODO: trigger pull start on catalyst
};
export const triggerCatalystPullStart =
process.env.NODE_ENV === "test"
? async () => {} // noop in case of tests
: async (stream: DBStream, playbackUrl: string) => {
// TODO: use pull.location field to call catalyst on the closest region.
// Ideally by calling catalyst-api/catabalancer and letting it figure it out.

// Instead of the above, for now just access the stream playbackUrl to trigger
// the pull to start.
const deadline = Date.now() + 2 * PULL_START_TIMEOUT;
while (Date.now() < deadline) {
const res = await fetchWithTimeoutAndRedirects(playbackUrl, {
method: "GET",
timeout: PULL_START_TIMEOUT,
maxRedirects: 10,
});
const body = await res.text();
const isHlsErr = body.includes("#EXT-X-ERROR: Stream open failed");
if (res.ok && !isHlsErr) {
return;
}

logger.warn(
`failed to trigger catalyst pull for stream=${
stream.id
} playbackUrl=${playbackUrl} status=${
res.status
} error=${JSON.stringify(body)}`
);
await sleep(1000);
}

throw new Error(`failed to trigger catalyst pull`);
};

export const triggerCatalystStreamNuke = (req: Request, playback_id: string) =>
triggerCatalystEvent(req, { resource: "nuke", playback_id });
Expand Down
17 changes: 12 additions & 5 deletions packages/api/src/controllers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import {
mapInputCreatorId,
triggerCatalystStreamUpdated,
triggerCatalystStreamNuke,
TODOtriggerCatalystPullStart,
triggerCatalystPullStart,
} from "./helpers";
import wowzaHydrate from "./wowza-hydrate";
import Queue from "../store/queue";
Expand Down Expand Up @@ -1082,7 +1082,8 @@ app.put(
stream = await db.stream.get(stream.id, { useReplica: false });
}

await TODOtriggerCatalystPullStart(stream);
const ingest = await getIngestBase(req);
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));

if (waitActive === "true") {
stream = await pollWaitStreamActive(req, stream.id);
Expand Down Expand Up @@ -1128,7 +1129,11 @@ app.post(

if (streams.length === 1) {
const stream = streams[0];
await TODOtriggerCatalystPullStart(stream);
const ingest = await getIngestBase(req);
await triggerCatalystPullStart(
stream,
getHLSPlaybackUrl(ingest, stream)
);

return res
.status(200)
Expand All @@ -1149,7 +1154,8 @@ app.post(
const stream = await handleCreateStream(req);

if (autoStartPull === "true") {
await TODOtriggerCatalystPullStart(stream);
const ingest = await getIngestBase(req);
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));
}

res.status(201);
Expand Down Expand Up @@ -1920,7 +1926,8 @@ app.post(
return res.json({ errors: ["stream does not have a pull source"] });
}

await TODOtriggerCatalystPullStart(stream);
const ingest = await getIngestBase(req);
await triggerCatalystPullStart(stream, getHLSPlaybackUrl(ingest, stream));

res.status(204).end();
}
Expand Down
65 changes: 21 additions & 44 deletions packages/api/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import fetch, { RequestInit, Response } from "node-fetch";

export interface RequestInitWithTimeout extends RequestInit {
timeout?: number;
}

export interface RequestInitWithRedirects extends RequestInitWithTimeout {
maxRedirects?: number;
}

export const timeout = <T>(ms: number, fn: () => Promise<T>) => {
return new Promise<T>((resolve, reject) => {
const handle = setTimeout(() => {
Expand Down Expand Up @@ -86,59 +91,31 @@ export const fetchWithTimeout = (

export const fetchWithTimeoutAndRedirects = async (
url: string,
options: RequestInitWithTimeout,
redirectCount = 5
options: RequestInitWithRedirects
): Promise<Response> => {
const { maxRedirects = 5 } = options;

// Throw error if maximum number of redirects has been exceeded
if (redirectCount < 0) {
if (maxRedirects < 0) {
throw new Error("Maximum number of redirects exceeded");
}

options = { ...options, redirect: "manual" };

return new Promise<Response>((resolve, reject) => {
let timeout = setTimeout(() => {
timeout = null;
reject("timeout");
}, options.timeout || 10 * 1000);

fetch(url, options).then(
async (response) => {
if (timeout === null) {
// already timed out
return;
}

clearTimeout(timeout);

// Handle redirects
if (response.status >= 300 && response.status < 400) {
const newUrl = response.headers.get("location");
if (!newUrl) {
reject("Redirect with no location");
return;
}
const response = await fetchWithTimeout(url, options);
if (response.status < 300 || response.status >= 400) {
return response;
}

const newResponse = await fetchWithTimeoutAndRedirects(
newUrl,
options,
redirectCount - 1
);
resolve(newResponse);
} else {
resolve(response);
}
},
(rejectReason) => {
if (timeout === null) {
// already timed out
return;
}
// Handle redirects
const newUrl = response.headers.get("location");
if (!newUrl) {
throw new Error("Redirect with no location");
}

clearTimeout(timeout);
reject(rejectReason);
}
);
return await fetchWithTimeoutAndRedirects(newUrl, {
...options,
maxRedirects: maxRedirects - 1,
});
};

Expand Down

0 comments on commit 6913261

Please sign in to comment.