Skip to content

Commit

Permalink
api: Create PUT /stream/pull API for idempotent pull stream (#2024)
Browse files Browse the repository at this point in the history
* api/schema: Make lat/lon required fields

* api: Create separate PUT /stream/pull endpoint

* api: Filter by userId and non-deleted on queries

scary

* api: Update stream fields on PUT /pull API

* api: Wait until stream is active after trigger

* api: Return 200 when the stream already existed

* api: Allow querying by creatorId/pull.source

* api: Allow deduping streams by creatorId

* api/test: Fix asset indexes test

* api/test: Add tests for new API

* api: Fix response not to omit default fields

We had an object with a couple of undefined fields
which then didn't get the default added by addDefaultFields

Fix it by re-reading the obj from the DB, which will not only
get the actually serialized version (undefined is omited) but
get the actual final state in the db (maybe someone wrote at
the same time 🤷)

* api: Fix indexes

Remove 'name' from /pull potential keys. It doesn't
have an index and we won't need it anyway for now, so
let's KISS.

Also moved index: directive to db-schema

---------

Co-authored-by: Thom Shutt <[email protected]>
  • Loading branch information
victorges and thomshutt authored Feb 1, 2024
1 parent b611d60 commit bc809e4
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 56 deletions.
1 change: 1 addition & 0 deletions packages/api/src/controllers/asset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ const fieldsMap = {
createdAt: { val: `asset.data->'createdAt'`, type: "int" },
updatedAt: { val: `asset.data->'status'->'updatedAt'`, type: "int" },
userId: `asset.data->>'userId'`,
creatorId: `stream.data->'creatorId'->>'value'`,
playbackId: `asset.data->>'playbackId'`,
playbackRecordingId: `asset.data->>'playbackRecordingId'`,
phase: `asset.data->'status'->>'phase'`,
Expand Down
145 changes: 145 additions & 0 deletions packages/api/src/controllers/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ let mockUser: User;
let mockAdminUser: User;
let mockNonAdminUser: User;
let postMockStream: Stream;
let postMockPullStream: Stream;
// jest.setTimeout(70000)

beforeAll(async () => {
Expand All @@ -59,6 +60,12 @@ beforeAll(async () => {
renditions: ["random_prefix_bbb_160p"],
},
];
postMockPullStream = {
...postMockStream,
pull: {
source: "https://playback.space/video+7bbb3wee.flv",
},
};

mockUser = {
email: `[email protected]`,
Expand Down Expand Up @@ -478,6 +485,144 @@ describe("controllers/stream", () => {
});
});

describe("pull stream idempotent creation", () => {
beforeEach(async () => {
// TODO: Remove this once experiment is done
await client.post("/experiment", {
name: "stream-pull-source",
audienceUserIds: [adminUser.id],
});
});

it("should require a pull configuration", async () => {
let res = await client.put("/stream/pull", postMockStream);
expect(res.status).toBe(400);
const errors = await res.json();
expect(errors).toMatchObject({
errors: [
expect.stringContaining("stream pull configuration is required"),
],
});

res = await client.put("/stream/pull", {
...postMockStream,
pull: {}, // an empty object is missing the 'source' field
});
expect(res.status).toBe(422);
});

it("should create a stream if a pull config is present", async () => {
const now = Date.now();
const res = await client.put("/stream/pull", postMockPullStream);
expect(res.status).toBe(201);
const stream = await res.json();
expect(stream.id).toBeDefined();
expect(stream.kind).toBe("stream");
expect(stream.name).toBe("test_stream");
expect(stream.createdAt).toBeGreaterThanOrEqual(now);
const document = await db.stream.get(stream.id);
expect(server.db.stream.addDefaultFields(document)).toEqual(stream);
});

it("should update a stream if it has the same pull source", async () => {
let res = await client.put("/stream/pull", postMockPullStream);
expect(res.status).toBe(201);
const stream = await res.json();

const now = Date.now();
res = await client.put("/stream/pull", {
...postMockPullStream,
name: "updated_stream",
profiles: [],
});
expect(res.status).toBe(200);
const updatedStream = await res.json();
expect(updatedStream.id).toBe(stream.id);
expect(updatedStream.name).toBe("updated_stream");
expect(updatedStream.profiles).toEqual([]);

const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});

it("should fail to dedup streams by a random key", async () => {
let res = await client.put(
"/stream/pull?key=invalid",
postMockPullStream
);
expect(res.status).toBe(400);
const errors = await res.json();
expect(errors).toMatchObject({
errors: [expect.stringContaining("key must be one of")],
});
});

it("should fail to dedup streams by creatorId if not provided", async () => {
let res = await client.put(
"/stream/pull?key=creatorId",
postMockPullStream
);
expect(res.status).toBe(400);
const errors = await res.json();
expect(errors).toMatchObject({
errors: [expect.stringContaining("must be present in the payload")],
});
});

it("should dedup streams by creatorId if requested", async () => {
let res = await client.put("/stream/pull?key=creatorId", {
...postMockPullStream,
creatorId: "0xjest",
});
expect(res.status).toBe(201);
const stream = await res.json();

res = await client.put("/stream/pull", {
...postMockPullStream,
creatorId: "0xjest",
name: "updated_stream",
profiles: [],
});
expect(res.status).toBe(200);
const updatedStream = await res.json();
expect(updatedStream.id).toBe(stream.id);
expect(updatedStream.name).toBe("updated_stream");
expect(updatedStream.profiles).toEqual([]);

const document = await db.stream.get(stream.id);
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
});

it("should wait for stream to become active if requested", async () => {
let responded = false;
const resProm = client.put(
"/stream/pull?waitActive=true",
postMockPullStream
);
resProm.then(() => (responded = true));

// give some time for API to create object in DB
await sleep(100);

const [streams] = await db.stream.find();
expect(streams).toHaveLength(1);
expect(streams[0].isActive).toBe(false);

// stream not active yet
expect(responded).toBe(false);

// set stream active
await db.stream.update(streams[0].id, { isActive: true });

const res = await resProm;
expect(responded).toBe(true); // make sure this works
expect(res.status).toBe(201);
const stream = await res.json();
expect(stream.id).toBe(streams[0].id);
expect(stream.isActive).toBe(true);
});
});

it("should create a stream, delete it, and error when attempting additional delete or replace", async () => {
const res = await client.post("/stream", { ...postMockStream });
expect(res.status).toBe(201);
Expand Down
Loading

0 comments on commit bc809e4

Please sign in to comment.