From 61c173ed151813988cfda81909e199039d8244eb Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 18 Dec 2024 13:32:15 -0800 Subject: [PATCH] message queue --- schema/crdb/add-webhooks/README.adoc | 23 ++++++++++++++----- schema/crdb/add-webhooks/up06.sql | 16 +++++--------- schema/crdb/add-webhooks/up07.sql | 13 +++++++---- schema/crdb/add-webhooks/up08.sql | 14 ++++++------ schema/crdb/add-webhooks/up09.sql | 19 +++++++++------- schema/crdb/add-webhooks/up10.sql | 30 ++++--------------------- schema/crdb/add-webhooks/up11.sql | 11 ++++++---- schema/crdb/add-webhooks/up12.sql | 9 ++++++++ schema/crdb/add-webhooks/up13.sql | 27 +++++++++++++++++++++++ schema/crdb/add-webhooks/up14.sql | 4 ++++ schema/crdb/dbinit.sql | 33 +++++++++++++++++++++++++++- 11 files changed, 132 insertions(+), 67 deletions(-) create mode 100644 schema/crdb/add-webhooks/up12.sql create mode 100644 schema/crdb/add-webhooks/up13.sql create mode 100644 schema/crdb/add-webhooks/up14.sql diff --git a/schema/crdb/add-webhooks/README.adoc b/schema/crdb/add-webhooks/README.adoc index f3ea209b55..c913776b73 100644 --- a/schema/crdb/add-webhooks/README.adoc +++ b/schema/crdb/add-webhooks/README.adoc @@ -20,21 +20,32 @@ associates a webhook receiver with multiple event classes that the receiver is subscribed to. *** `up05.sql` creates an index `lookup_webhook_subscriptions_by_rx` for looking up all event classes that a receiver ID is subscribed to. +*** `up06.sql` creates an index `lookup_webhook_rxs_for_event` on +`omicron.public.webhook_rx_subscription` for looking up all receivers subscribed +to a particular event class. +* *Webhook message queue*: +** `up07.sql` creates the `omicron.public.webhook_msg` table, which contains the +queue of un-dispatched webhook events. The dispatcher operates on entries in +this queue, dispatching the event to receivers and generating the payload for +each receiver. +** `up08.sql` creates the `lookup_undispatched_webhook_msgs` index on +`omicron.public.webhook_msg` for looking up webhook messages which have not yet been +dispatched and ordering by their creation times. * *Webhook message dispatching and delivery attempts*: ** *Dispatch table*: -*** `up06.sql` creates the table `omicron.public.webhook_msg_dispatch`, which +*** `up09.sql` creates the table `omicron.public.webhook_msg_dispatch`, which tracks the webhook messages that have been dispatched to receivers. -*** `up07.sql` creates an index `lookup_webhook_dispatched_to_rx` for looking up +*** `up10.sql` creates an index `lookup_webhook_dispatched_to_rx` for looking up entries in `omicron.public.webhook_msg_dispatch` by receiver ID. -*** `up08.sql` creates an index `webhook_dispatch_in_flight` for looking up all currently in-flight webhook +*** `up11.sql` creates an index `webhook_dispatch_in_flight` for looking up all currently in-flight webhook messages (entries in `omicron.public.webhook_msg_dispatch` where the `time_completed` field has not been set). ** *Delivery attempts*: -*** `up09.sql` creates the enum `omicron.public.webhook_msg_delivery_result`, +*** `up12.sql` creates the enum `omicron.public.webhook_msg_delivery_result`, representing the potential outcomes of a webhook delivery attempt. -*** `up10.sql` creates the table `omicron.public.webhook_msg_delivery_attempt`, +*** `up13.sql` creates the table `omicron.public.webhook_msg_delivery_attempt`, which records each individual delivery attempt for a webhook message in the `webhook_msg_dispatch` table. -*** `up11.sql` creates an index `lookup_webhook_delivery_attempts_for_msg` on +*** `up14.sql` creates an index `lookup_webhook_delivery_attempts_for_msg` on `omicron.public.webhook_msg_delivery_attempt`, for looking up all attempts to deliver a message with a given dispatch ID. diff --git a/schema/crdb/add-webhooks/up06.sql b/schema/crdb/add-webhooks/up06.sql index 08a44f54e3..407424440c 100644 --- a/schema/crdb/add-webhooks/up06.sql +++ b/schema/crdb/add-webhooks/up06.sql @@ -1,12 +1,6 @@ -CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg_dispatch ( - -- UUID of this dispatch. - id UUID PRIMARY KEY, - -- UUID of the webhook receiver (foreign key into - -- `omicron.public.webhook_rx`) - rx_id UUID NOT NULL, - payload JSONB NOT NULL, - time_created TIMESTAMPTZ NOT NULL, - -- If this is set, then this webhook message has either been delivered - -- successfully, or is considered permanently failed. - time_completed TIMESTAMPTZ, +-- Look up all webhook receivers subscribed to an event class. This is used by +-- the dispatcher to determine who is interested in a particular event. +CREATE INDEX IF NOT EXISTS lookup_webhook_rxs_for_event +ON omicron.public.webhook_rx_subscription ( + event_class ); diff --git a/schema/crdb/add-webhooks/up07.sql b/schema/crdb/add-webhooks/up07.sql index 4cc13a67cc..ba667e930f 100644 --- a/schema/crdb/add-webhooks/up07.sql +++ b/schema/crdb/add-webhooks/up07.sql @@ -1,5 +1,10 @@ --- Index for looking up all webhook messages dispatched to a receiver ID -CREATE INDEX IF NOT EXISTS lookup_webhook_dispatched_to_rx -ON omicron.public.webhook_msg_dispatch ( - rx_id +CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg ( + id UUID PRIMARY KEY, + time_created TIMESTAMPTZ NOT NULL, + -- Set when dispatch entries have been created for this event. + time_dispatched TIMESTAMPTZ, + -- The class of event that this is. + event_class STRING(512) NOT NULL, + -- Actual event data. The structure of this depends on the event class. + event JSONB NOT NULL ); diff --git a/schema/crdb/add-webhooks/up08.sql b/schema/crdb/add-webhooks/up08.sql index d7cb44b173..e18c2cea1a 100644 --- a/schema/crdb/add-webhooks/up08.sql +++ b/schema/crdb/add-webhooks/up08.sql @@ -1,7 +1,7 @@ --- Index for looking up all currently in-flight webhook messages, and ordering --- them by their creation times. -CREATE INDEX IF NOT EXISTS webhook_dispatch_in_flight -ON omicron.public.webhook_msg_dispatch ( - time_created, id -) WHERE - time_completed IS NULL; +-- Look up webhook messages in need of dispatching. +-- +-- This is used by the message dispatcher when looking for messages to dispatch. +CREATE INDEX IF NOT EXISTS lookup_undispatched_webhook_msgs +ON omicron.public.webhook_msg ( + id, time_created +) WHERE time_dispatched IS NULL; diff --git a/schema/crdb/add-webhooks/up09.sql b/schema/crdb/add-webhooks/up09.sql index 00e5cb3e7b..08a44f54e3 100644 --- a/schema/crdb/add-webhooks/up09.sql +++ b/schema/crdb/add-webhooks/up09.sql @@ -1,9 +1,12 @@ -CREATE TYPE IF NOT EXISTS omicron.public.webhook_msg_delivery_result as ENUM ( - -- The delivery attempt failed with an HTTP error. - 'failed_http_error', - -- The delivery attempt failed because the receiver endpoint was - -- unreachable. - 'failed_unreachable', - -- The delivery attempt succeeded. - 'succeeded' +CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg_dispatch ( + -- UUID of this dispatch. + id UUID PRIMARY KEY, + -- UUID of the webhook receiver (foreign key into + -- `omicron.public.webhook_rx`) + rx_id UUID NOT NULL, + payload JSONB NOT NULL, + time_created TIMESTAMPTZ NOT NULL, + -- If this is set, then this webhook message has either been delivered + -- successfully, or is considered permanently failed. + time_completed TIMESTAMPTZ, ); diff --git a/schema/crdb/add-webhooks/up10.sql b/schema/crdb/add-webhooks/up10.sql index 19f87bf459..4cc13a67cc 100644 --- a/schema/crdb/add-webhooks/up10.sql +++ b/schema/crdb/add-webhooks/up10.sql @@ -1,27 +1,5 @@ -CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg_delivery_attempt ( - id UUID PRIMARY KEY, - -- Foreign key into `omicron.public.webhook_msg_dispatch`. - dispatch_id UUID NOT NULL, - result omicron.public.webhook_msg_delivery_result NOT NULL, - response_status INT2, - response_duration INTERVAL, - time_created TIMESTAMPTZ NOT NULL, - - CONSTRAINT response_iff_not_unreachable CHECK ( - ( - -- If the result is 'succeedeed' or 'failed_http_error', response - -- data must be present. - (result = 'succeeded' OR result = 'failed_http_error') AND ( - response_status IS NOT NULL AND - response_duration IS NOT NULL - ) - ) OR ( - -- If the result is 'failed_unreachable', no response data is - -- present. - (result = 'failed_unreachable') AND ( - response_status IS NULL AND - response_duration IS NULL - ) - ) - ) +-- Index for looking up all webhook messages dispatched to a receiver ID +CREATE INDEX IF NOT EXISTS lookup_webhook_dispatched_to_rx +ON omicron.public.webhook_msg_dispatch ( + rx_id ); diff --git a/schema/crdb/add-webhooks/up11.sql b/schema/crdb/add-webhooks/up11.sql index 2a32f10969..d7cb44b173 100644 --- a/schema/crdb/add-webhooks/up11.sql +++ b/schema/crdb/add-webhooks/up11.sql @@ -1,4 +1,7 @@ -CREATE INDEX IF NOT EXISTS lookup_webhook_delivery_attempts_for_msg -ON omicron.public.webhook_msg_delivery_attempts ( - dispatch_id -); +-- Index for looking up all currently in-flight webhook messages, and ordering +-- them by their creation times. +CREATE INDEX IF NOT EXISTS webhook_dispatch_in_flight +ON omicron.public.webhook_msg_dispatch ( + time_created, id +) WHERE + time_completed IS NULL; diff --git a/schema/crdb/add-webhooks/up12.sql b/schema/crdb/add-webhooks/up12.sql new file mode 100644 index 0000000000..00e5cb3e7b --- /dev/null +++ b/schema/crdb/add-webhooks/up12.sql @@ -0,0 +1,9 @@ +CREATE TYPE IF NOT EXISTS omicron.public.webhook_msg_delivery_result as ENUM ( + -- The delivery attempt failed with an HTTP error. + 'failed_http_error', + -- The delivery attempt failed because the receiver endpoint was + -- unreachable. + 'failed_unreachable', + -- The delivery attempt succeeded. + 'succeeded' +); diff --git a/schema/crdb/add-webhooks/up13.sql b/schema/crdb/add-webhooks/up13.sql new file mode 100644 index 0000000000..19f87bf459 --- /dev/null +++ b/schema/crdb/add-webhooks/up13.sql @@ -0,0 +1,27 @@ +CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg_delivery_attempt ( + id UUID PRIMARY KEY, + -- Foreign key into `omicron.public.webhook_msg_dispatch`. + dispatch_id UUID NOT NULL, + result omicron.public.webhook_msg_delivery_result NOT NULL, + response_status INT2, + response_duration INTERVAL, + time_created TIMESTAMPTZ NOT NULL, + + CONSTRAINT response_iff_not_unreachable CHECK ( + ( + -- If the result is 'succeedeed' or 'failed_http_error', response + -- data must be present. + (result = 'succeeded' OR result = 'failed_http_error') AND ( + response_status IS NOT NULL AND + response_duration IS NOT NULL + ) + ) OR ( + -- If the result is 'failed_unreachable', no response data is + -- present. + (result = 'failed_unreachable') AND ( + response_status IS NULL AND + response_duration IS NULL + ) + ) + ) +); diff --git a/schema/crdb/add-webhooks/up14.sql b/schema/crdb/add-webhooks/up14.sql new file mode 100644 index 0000000000..2a32f10969 --- /dev/null +++ b/schema/crdb/add-webhooks/up14.sql @@ -0,0 +1,4 @@ +CREATE INDEX IF NOT EXISTS lookup_webhook_delivery_attempts_for_msg +ON omicron.public.webhook_msg_delivery_attempts ( + dispatch_id +); diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index a3577848e4..84e1d0ca5a 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -4724,7 +4724,7 @@ ON omicron.public.webhook_rx_secret ( ) WHERE time_deleted IS NULL; -CREATE TABLE IF NOT EXISTS omicron.public.webhook_subscription ( +CREATE TABLE IF NOT EXISTS omicron.public.webhook_rx_subscription ( -- UUID of the webhook receiver (foreign key into -- `omicron.public.webhook_rx`) rx_id UUID NOT NULL, @@ -4740,6 +4740,37 @@ ON omicron.public.webhook_rx_subscription ( rx_id ); +-- Look up all webhook receivers subscribed to an event class. This is used by +-- the dispatcher to determine who is interested in a particular event. +CREATE INDEX IF NOT EXISTS lookup_webhook_rxs_for_event +ON omicron.public.webhook_rx_subscription ( + event_class +); + +/* + * Webhook message queue. + */ + +CREATE TABLE IF NOT EXISTS omicron.public.webhook_msg ( + id UUID PRIMARY KEY, + time_created TIMESTAMPTZ NOT NULL, + -- Set when dispatch entries have been created for this event. + time_dispatched TIMESTAMPTZ, + -- The class of event that this is. + event_class STRING(512) NOT NULL, + -- Actual event data. The structure of this depends on the event class. + event JSONB NOT NULL +); + +-- Look up webhook messages in need of dispatching. +-- +-- This is used by the message dispatcher when looking for messages to dispatch. +CREATE INDEX IF NOT EXISTS lookup_undispatched_webhook_msgs +ON omicron.public.webhook_msg ( + id, time_created +) WHERE time_dispatched IS NULL; + + /* * Webhook message dispatching and delivery attempts. */