From 3803bead0e21ceffb02693257fc8e7033471e97d Mon Sep 17 00:00:00 2001 From: Max Holland Date: Tue, 23 Jan 2024 16:59:14 +0000 Subject: [PATCH] Start on webhook log API (#2015) * Start on webhook log API * remove webhook status api * rename to requests * fix webhook list sorting * Review comments, schema updates etc * schema updates --- packages/api/src/controllers/task.ts | 2 +- packages/api/src/controllers/webhook.test.js | 41 +++-- packages/api/src/controllers/webhook.ts | 159 +++++++++++-------- packages/api/src/schema/api-schema.yaml | 46 +++--- packages/api/src/schema/db-schema.yaml | 18 +++ packages/api/src/webhooks/cannon.ts | 35 +++- 6 files changed, 186 insertions(+), 115 deletions(-) diff --git a/packages/api/src/controllers/task.ts b/packages/api/src/controllers/task.ts index 12f3761714..631abea4dd 100644 --- a/packages/api/src/controllers/task.ts +++ b/packages/api/src/controllers/task.ts @@ -211,7 +211,7 @@ app.get("/", authorizer({}), async (req, res) => { const query = parseFilters(fieldsMap, filters); query.push(sql`task.data->>'userId' = ${req.user.id}`); - if (!all || all === "false") { + if (!all || all === "false" || !req.user.admin) { query.push(sql`task.data->>'deleted' IS NULL`); } diff --git a/packages/api/src/controllers/webhook.test.js b/packages/api/src/controllers/webhook.test.js index 0bc1511a36..e3dab43ee2 100644 --- a/packages/api/src/controllers/webhook.test.js +++ b/packages/api/src/controllers/webhook.test.js @@ -1,5 +1,6 @@ import serverPromise from "../test-server"; import { TestClient, clearDatabase } from "../test-helpers"; +import { sleep } from "../util"; let server; let mockAdminUser; @@ -190,23 +191,6 @@ describe("controllers/webhook", () => { expect(updated.name).toEqual("modified_name"); }); - it("update the status of a webhook", async () => { - const { id } = generatedWebhook; - const payload = { - response: { - webhookId: id, - statusCode: 400, - response: { - body: "", - status: 400, - }, - }, - errorMessage: "Error test", - }; - const res = await client.post(`/webhook/${id}/status`, payload); - expect(res.status).toBe(204); - }); - it("disallows setting webhook for another user", async () => { const { id } = generatedWebhook; const modifiedHook = { ...generatedWebhook, userId: nonAdminUser.id }; @@ -306,6 +290,29 @@ describe("controllers/webhook", () => { expect(setActiveRes.status).toBe(204); // const setActiveResJson = await setActiveRes.json() // expect(setActiveResJson).toBeDefined() + + // a log entry for the webhook should appear after a short wait + let requestsJson; + for (let i = 0; i < 5; i++) { + await sleep(500); + const requestsRes = await client.get( + `/webhook/${generatedWebhook.id}/requests` + ); + expect(requestsRes.status).toBe(200); + requestsJson = await requestsRes.json(); + if (requestsJson.length > 0) { + break; + } + } + + expect(requestsJson.length).toBeGreaterThan(0); + expect(requestsJson[0].webhookId).toBe(generatedWebhook.id); + expect(requestsJson[0].event).toBe("stream.started"); + expect(requestsJson[0].request).toBeDefined(); + expect(requestsJson[0].request.body).toBeDefined(); + expect(requestsJson[0].request.headers).toBeDefined(); + expect(requestsJson[0].response).toBeDefined(); + expect(requestsJson[0].response.body).toBeDefined(); }, 20000); it("trigger webhook with localIP", async () => { diff --git a/packages/api/src/controllers/webhook.ts b/packages/api/src/controllers/webhook.ts index 6df13282d5..dbe97c4057 100644 --- a/packages/api/src/controllers/webhook.ts +++ b/packages/api/src/controllers/webhook.ts @@ -8,8 +8,6 @@ import { makeNextHREF, parseFilters, parseOrder, FieldsMap } from "./helpers"; import { db } from "../store"; import sql from "sql-template-strings"; import { UnprocessableEntityError, NotFoundError } from "../store/errors"; -import { WebhookStatusPayload } from "../schema/types"; -import { DBWebhook } from "../store/webhook-table"; function validateWebhookPayload(id, userId, createdAt, payload) { try { @@ -46,7 +44,7 @@ const fieldsMap: FieldsMap = { url: `webhook.data->>'url'`, blocking: `webhook.data->'blocking'`, deleted: `webhook.data->'deleted'`, - createdAt: `webhook.data->'createdAt'`, + createdAt: { val: `webhook.data->'createdAt'`, type: "int" }, userId: `webhook.data->>'userId'`, "user.email": { val: `users.data->>'email'`, type: "full-text" }, sharedSecret: `webhook.data->>'sharedSecret'`, @@ -96,7 +94,7 @@ app.get("/", authorizer({}), async (req, res) => { const query = parseFilters(fieldsMap, filters); query.push(sql`webhook.data->>'userId' = ${req.user.id}`); - if (!all || all === "false") { + if (!all || all === "false" || !req.user.admin) { query.push(sql`webhook.data->>'deleted' IS NULL`); } @@ -234,70 +232,6 @@ app.patch( } ); -app.post( - "/:id/status", - authorizer({ anyAdmin: true }), - validatePost("webhook-status-payload"), - async (req, res) => { - const { id } = req.params; - const webhook = await db.webhook.get(id); - if (!webhook || webhook.deleted) { - return res.status(404).json({ errors: ["webhook not found or deleted"] }); - } - - const { response, errorMessage } = req.body as WebhookStatusPayload; - - if (!response || !response.response) { - return res.status(422).json({ errors: ["missing response in payload"] }); - } - if (!response.response.body && response.response.body !== "") { - return res - .status(400) - .json({ errors: ["missing body in payload response"] }); - } - - try { - const triggerTime = response.createdAt ?? Date.now(); - let status: DBWebhook["status"] = { lastTriggeredAt: triggerTime }; - - if ( - response.statusCode >= 300 || - !response.statusCode || - response.statusCode === 0 - ) { - status = { - ...status, - lastFailure: { - error: errorMessage, - timestamp: triggerTime, - statusCode: response.statusCode, - response: response.response.body, - }, - }; - } - await db.webhook.updateStatus(webhook.id, status); - } catch (e) { - console.log( - `Unable to store status of webhook ${webhook.id} url: ${webhook.url}` - ); - } - - // TODO : Change the response type and save the response making sure it's compatible object - /*await db.webhookResponse.create({ - id: uuid(), - webhookId: webhook.id, - createdAt: Date.now(), - statusCode: response.statusCode, - response: { - body: response.response.body, - status: response.statusCode, - }, - });*/ - - res.status(204).end(); - } -); - app.delete("/:id", authorizer({}), async (req, res) => { // delete a specific webhook const webhook = await db.webhook.get(req.params.id); @@ -345,4 +279,93 @@ app.delete("/", authorizer({}), async (req, res) => { res.end(); }); +const requestsFieldsMap: FieldsMap = { + id: `webhook_response.ID`, + createdAt: { val: `webhook_response.data->'createdAt'`, type: "int" }, + userId: `webhook_response.data->>'userId'`, + event: `webhook_response.data->>'event'`, + statusCode: `webhook_response.data->'response'->>'status'`, +}; + +app.get("/:id/requests", authorizer({}), async (req, res) => { + let { limit, cursor, all, event, allUsers, order, filters, count } = + req.query; + if (isNaN(parseInt(limit))) { + limit = undefined; + } + if (!order) { + order = "createdAt-true"; + } + + if (req.user.admin && allUsers && allUsers !== "false") { + const query = parseFilters(requestsFieldsMap, filters); + query.push(sql`webhook_response.data->>'webhookId' = ${req.params.id}`); + if (!all || all === "false") { + query.push(sql`webhook_response.data->>'deleted' IS NULL`); + } + + let fields = + " webhook_response.id as id, webhook_response.data as data, users.id as usersId, users.data as usersdata"; + if (count) { + fields = fields + ", count(*) OVER() AS count"; + } + const from = `webhook_response left join users on webhook_response.data->>'userId' = users.id`; + const [output, newCursor] = await db.webhookResponse.find(query, { + limit, + cursor, + fields, + from, + order: parseOrder(requestsFieldsMap, order), + process: ({ data, usersdata, count: c }) => { + if (count) { + res.set("X-Total-Count", c); + } + return { ...data, user: db.user.cleanWriteOnlyResponse(usersdata) }; + }, + }); + + res.status(200); + + if (output.length > 0 && newCursor) { + res.links({ next: makeNextHREF(req, newCursor) }); + } + return res.json(output); + } + + const query = parseFilters(requestsFieldsMap, filters); + query.push(sql`webhook_response.data->>'userId' = ${req.user.id}`); + query.push(sql`webhook_response.data->>'webhookId' = ${req.params.id}`); + + if (!all || all === "false" || !req.user.admin) { + query.push(sql`webhook_response.data->>'deleted' IS NULL`); + } + + let fields = " webhook_response.id as id, webhook_response.data as data"; + if (count) { + fields = fields + ", count(*) OVER() AS count"; + } + const from = `webhook_response`; + const [output, newCursor] = await db.webhookResponse.find(query, { + limit, + cursor, + fields, + from, + order: parseOrder(requestsFieldsMap, order), + process: ({ data, count: c }) => { + if (count) { + res.set("X-Total-Count", c); + } + return { ...data }; + }, + }); + + res.status(200); + + if (output.length > 0) { + res.links({ next: makeNextHREF(req, newCursor) }); + } + + return res.json(output); +}); + export default app; diff --git a/packages/api/src/schema/api-schema.yaml b/packages/api/src/schema/api-schema.yaml index beca08b051..1a43ef57d9 100644 --- a/packages/api/src/schema/api-schema.yaml +++ b/packages/api/src/schema/api-schema.yaml @@ -219,7 +219,6 @@ components: type: object required: - webhookId - - statusCode additionalProperties: false properties: id: @@ -229,39 +228,49 @@ components: webhookId: readOnly: true type: string - eventId: + description: ID of the webhook this request was made for + event: readOnly: true type: string + description: The event type that triggered the webhook request createdAt: readOnly: true type: number description: | - Timestamp (in milliseconds) at which webhook response object was + Timestamp (in milliseconds) at which webhook request object was created example: 1587667174725 duration: type: number - statusCode: - type: number - default: 0 + description: The time taken (in seconds) to make the webhook request + request: + type: object + properties: + url: + type: string + description: URL used for the request + method: + type: string + description: HTTP request method + headers: + type: object + description: HTTP request headers + body: + type: string + description: request body response: type: object additionalProperties: false properties: body: type: string - headers: - type: object - additionalProperties: - type: array - items: - type: string - redirected: - type: boolean + description: response body status: type: number + description: HTTP status code statusText: type: string + description: response status text clip-payload: type: object additionalProperties: false @@ -614,15 +623,6 @@ components: $ref: "#/components/schemas/webhook/properties/sharedSecret" streamId: $ref: "#/components/schemas/webhook/properties/streamId" - webhook-status-payload: - type: object - additionalProperties: false - properties: - errorMessage: - type: string - description: Error message if the webhook failed to process the event - response: - $ref: "#/components/schemas/webhook-response" session: type: object required: diff --git a/packages/api/src/schema/db-schema.yaml b/packages/api/src/schema/db-schema.yaml index 3245ba6562..9fae4ce3cd 100644 --- a/packages/api/src/schema/db-schema.yaml +++ b/packages/api/src/schema/db-schema.yaml @@ -593,10 +593,28 @@ components: readOnly: true type: string example: webhookResponse + userId: + readOnly: true + type: string + index: true webhookId: index: true eventId: + type: string + index: true + writeOnly: true + event: + index: true + createdAt: index: true + response: + type: object + properties: + status: + index: true + redirected: + type: boolean + writeOnly: true detection-webhook-payload: type: object required: diff --git a/packages/api/src/webhooks/cannon.ts b/packages/api/src/webhooks/cannon.ts index ce9af2405e..adc25b444d 100644 --- a/packages/api/src/webhooks/cannon.ts +++ b/packages/api/src/webhooks/cannon.ts @@ -15,7 +15,11 @@ import { sign, sendgridEmail, pathJoin } from "../controllers/helpers"; import { taskScheduler } from "../task/scheduler"; import { generateUniquePlaybackId } from "../controllers/generate-keys"; import { createAsset, primaryStorageExperiment } from "../controllers/asset"; -import { DBStream } from "../store/stream-table"; +import { + DBStream, + DeprecatedStreamFields, + StreamStats, +} from "../store/stream-table"; import { USER_SESSION_TIMEOUT } from "../controllers/stream"; import { BadRequestError, UnprocessableEntityError } from "../store/errors"; import { db } from "../store"; @@ -411,6 +415,7 @@ export default class WebhookCannon { let errorMessage: string; let responseBody: string; let statusCode: number; + try { logger.info(`webhook ${webhook.id} firing`); resp = await fetchWithTimeout(webhook.url, params); @@ -440,7 +445,15 @@ export default class WebhookCannon { errorMessage = e.message; await this.retry(trigger, params, e); } finally { - await this.storeResponse(webhook, event, resp, startTime, responseBody); + await this.storeResponse( + webhook, + event, + resp, + startTime, + responseBody, + stream, + params + ); await this.storeTriggerStatus( trigger.webhook, triggerTime, @@ -474,26 +487,36 @@ export default class WebhookCannon { event: messages.WebhookEvent, resp: Response, startTime: [number, number], - responseBody: string + responseBody: string, + stream: DBStream, + params ) { try { const hrDuration = process.hrtime(startTime); - let encodedResponseBody = Buffer.from(responseBody).toString("base64"); + const encodedResponseBody = Buffer.from( + responseBody.substring(0, 1024) + ).toString("base64"); await this.db.webhookResponse.create({ id: uuid(), webhookId: webhook.id, eventId: event.id, + event: event.event, + userId: webhook.userId, createdAt: Date.now(), duration: hrDuration[0] + hrDuration[1] / 1e9, - statusCode: resp.status, response: { body: encodedResponseBody, - headers: resp.headers.raw(), redirected: resp.redirected, status: resp.status, statusText: resp.statusText, }, + request: { + url: webhook.url, + body: params.body, + method: params.method, + headers: params.headers, + }, }); } catch (e) { console.log(