diff --git a/Cargo.lock b/Cargo.lock index e171acdabe..83b1b6c951 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,12 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "anymap2" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" + [[package]] name = "api-keys" version = "0.1.0" @@ -140,7 +146,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.6.1", "serde_yaml 0.9.31", "sqlx", "thiserror", @@ -499,22 +505,24 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.32" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.0", ] [[package]] name = "clap" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f" dependencies = [ "clap_builder", "clap_derive", @@ -522,21 +530,21 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99" dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.0", ] [[package]] name = "clap_derive" -version = "4.4.7" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ "heck", "proc-macro2", @@ -546,9 +554,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "colorchoice" @@ -703,7 +711,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 1.0.109", ] @@ -717,7 +725,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 1.0.109", ] @@ -731,7 +739,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 2.0.48", ] @@ -808,18 +816,18 @@ dependencies = [ [[package]] name = "derive_builder" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +checksum = "660047478bc508c0fde22c868991eec0c40a63e48d610befef466d48e2bee574" dependencies = [ "derive_builder_macro", ] [[package]] name = "derive_builder_core" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +checksum = "9b217e6dd1011a54d12f3b920a411b5abd44b1716ecfe94f5f2f2f7b52e08ab7" dependencies = [ "darling 0.14.4", "proc-macro2", @@ -829,9 +837,9 @@ dependencies = [ [[package]] name = "derive_builder_macro" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +checksum = "7a5f77d7e20ac9153428f7ca14a88aba652adfc7a0ef0a06d654386310ef663b" dependencies = [ "derive_builder_core", "syn 1.0.109", @@ -862,6 +870,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.7" @@ -1556,6 +1570,20 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "job-executor" +version = "0.1.0" +dependencies = [ + "derive_builder", + "serde", + "serde_json", + "serde_with 3.6.1", + "sqlx", + "sqlxmq", + "tokio", + "tracing 0.1.0", +] + [[package]] name = "js-sys" version = "0.3.64" @@ -1758,7 +1786,7 @@ dependencies = [ "sha2", "socket2 0.4.10", "stringprep", - "strsim", + "strsim 0.10.0", "take_mut", "thiserror", "tokio", @@ -1836,7 +1864,7 @@ dependencies = [ "rust-i18n", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.6.1", "serde_yaml 0.9.31", "sqlx", "thiserror", @@ -2449,9 +2477,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.5", "bytes", @@ -2475,6 +2503,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -2899,9 +2928,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.5.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58c3a1b3e418f61c25b2aeb43fc6c95eaa252b8cecdda67f401943e9e08d33f" +checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" dependencies = [ "base64 0.21.5", "chrono", @@ -2909,8 +2938,9 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.2.1", "serde", + "serde_derive", "serde_json", - "serde_with_macros 3.5.0", + "serde_with_macros 3.6.1", "time", ] @@ -2940,9 +2970,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.5.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2068b437a31fc68f25dd7edc296b078f04b45145c199d8eed9866e45f1ff274" +checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -3325,6 +3355,35 @@ dependencies = [ "uuid", ] +[[package]] +name = "sqlxmq" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e914521071581f0413516de0e7931086ebe4a22d719c1100de29eb339c712311" +dependencies = [ + "anymap2", + "chrono", + "dotenv", + "log", + "serde", + "serde_json", + "sqlx", + "sqlxmq_macros", + "tokio", + "uuid", +] + +[[package]] +name = "sqlxmq_macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd00667edb120f18e14e2b4a71cddb308dc3605171ed7afda3056129d08b9f4f" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3354,6 +3413,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" + [[package]] name = "strum" version = "0.25.0" @@ -3534,9 +3599,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -3563,9 +3628,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 29a2a82e1a..d12095df39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "core/notifications", "lib/tracing-rs", "lib/es-entity-rs", + "lib/job-executor-rs", ] [workspace.dependencies] @@ -16,21 +17,21 @@ async-graphql = { version = "6.0.11", default-features = false, features = ["tra async-graphql-axum = "6.0.11" axum = { version = "0.6.20", features = ["headers", "macros"] } jsonwebtoken = "9.2.0" -clap = { version = "4.4.18", features = ["derive", "env"] } -derive_builder = "0.12.0" +clap = { version = "4.5", features = ["derive", "env"] } +derive_builder = "0.13.0" serde = { version = "1.0.196", features = ["derive"] } -tokio = { version = "1.33.0", features = ["full"] } -reqwest = { version = "0.11.23", default-features = false, features = ["json", "rustls-tls"] } +tokio = { version = "1.36", features = ["full"] } +reqwest = { version = "0.11.24", default-features = false, features = ["json", "rustls-tls"] } thiserror = "1.0.56" serde_yaml = "0.9.31" serde_json = "1.0.111" -chrono = { version = "0.4.32", features = ["clock", "serde"], default-features = false } +chrono = { version = "0.4.33", features = ["clock", "serde"], default-features = false } futures = "0.3.30" sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono"] } mongodb = "2.8.0" uuid = { version = "1.7.0", features = ["serde", "v4"] } rand = "0.8.5" -serde_with = "3.5.0" +serde_with = "3.6.0" tracing = "0.1.37" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } tracing-opentelemetry = "0.20.0" @@ -45,4 +46,5 @@ tonic-health = "0.10.2" prost = "0.12" rust-i18n = "3" google-fcm1 = "5.0.3" +sqlxmq = { version = "0.5", default-features = false, features = ["runtime-tokio-rustls"] } diff --git a/core/api-keys/src/server/mod.rs b/core/api-keys/src/server/mod.rs index 8a8c0960e0..50f89fc4d5 100644 --- a/core/api-keys/src/server/mod.rs +++ b/core/api-keys/src/server/mod.rs @@ -70,7 +70,7 @@ async fn check_handler( State((header, app)): State<(String, ApiKeysApp)>, headers: HeaderMap, ) -> Result, ApplicationError> { - tracing::extract_tracing(&headers); + tracing::http::extract_tracing(&headers); let key = headers.get(header).ok_or(ApplicationError::MissingApiKey)?; let (id, sub, read_only) = app.lookup_authenticated_subject(key.to_str()?).await?; let scope = if read_only { diff --git a/core/notifications/BUCK b/core/notifications/BUCK index deaf105687..b24375c2ed 100644 --- a/core/notifications/BUCK +++ b/core/notifications/BUCK @@ -43,6 +43,7 @@ galoy_rust_bin( deps = [ "//lib/tracing-rs:tracing", "//lib/es-entity-rs:es-entity", + "//lib/job-executor-rs:job-executor", "//third-party/rust:tokio", "//third-party/rust:anyhow", "//third-party/rust:async-graphql", @@ -59,6 +60,7 @@ galoy_rust_bin( "//third-party/rust:chrono", "//third-party/rust:futures", "//third-party/rust:sqlx", + "//third-party/rust:sqlxmq", "//third-party/rust:mongodb", "//third-party/rust:rand", "//third-party/rust:uuid", @@ -72,6 +74,7 @@ galoy_rust_bin( extra_tests = [ "//lib/tracing-rs:tracing", "//lib/es-entity-rs:es-entity", + "//lib/job-executor-rs:job-executor", ], protos = ["proto/notifications.proto"], env = { diff --git a/core/notifications/migrations/20210316025847_setup.down.sql b/core/notifications/migrations/20210316025847_setup.down.sql new file mode 100644 index 0000000000..1aa472e8ba --- /dev/null +++ b/core/notifications/migrations/20210316025847_setup.down.sql @@ -0,0 +1,12 @@ +DROP FUNCTION mq_checkpoint; +DROP FUNCTION mq_keep_alive; +DROP FUNCTION mq_delete; +DROP FUNCTION mq_commit; +DROP FUNCTION mq_insert; +DROP FUNCTION mq_poll; +DROP FUNCTION mq_active_channels; +DROP FUNCTION mq_latest_message; +DROP TABLE mq_payloads; +DROP TABLE mq_msgs; +DROP FUNCTION mq_uuid_exists; +DROP TYPE mq_new_t; diff --git a/core/notifications/migrations/20210316025847_setup.up.sql b/core/notifications/migrations/20210316025847_setup.up.sql new file mode 100644 index 0000000000..bf7f8f859d --- /dev/null +++ b/core/notifications/migrations/20210316025847_setup.up.sql @@ -0,0 +1,289 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- The UDT for creating messages +CREATE TYPE mq_new_t AS ( + -- Unique message ID + id UUID, + -- Delay before message is processed + delay INTERVAL, + -- Number of retries if initial processing fails + retries INT, + -- Initial backoff between retries + retry_backoff INTERVAL, + -- Name of channel + channel_name TEXT, + -- Arguments to channel + channel_args TEXT, + -- Interval for two-phase commit (or NULL to disable two-phase commit) + commit_interval INTERVAL, + -- Whether this message should be processed in order with respect to other + -- ordered messages. + ordered BOOLEAN, + -- Name of message + name TEXT, + -- JSON payload + payload_json TEXT, + -- Binary payload + payload_bytes BYTEA +); + +-- Small, frequently updated table of messages +CREATE TABLE mq_msgs ( + id UUID PRIMARY KEY, + created_at TIMESTAMPTZ DEFAULT NOW(), + attempt_at TIMESTAMPTZ DEFAULT NOW(), + attempts INT NOT NULL DEFAULT 5, + retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second', + channel_name TEXT NOT NULL, + channel_args TEXT NOT NULL, + commit_interval INTERVAL, + after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT +); + +-- Insert dummy message so that the 'nil' UUID can be referenced +INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); + +-- Internal helper function to check that a UUID is neither NULL nor NIL +CREATE FUNCTION mq_uuid_exists( + id UUID +) RETURNS BOOLEAN AS $$ + SELECT id IS NOT NULL AND id != uuid_nil() +$$ LANGUAGE SQL IMMUTABLE; + +-- Index for polling +CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); +-- Index for adding messages +CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; + +-- Index for ensuring strict message order +CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); + + +-- Large, less frequently updated table of message payloads +CREATE TABLE mq_payloads( + id UUID PRIMARY KEY, + name TEXT NOT NULL, + payload_json JSONB, + payload_bytes BYTEA +); + +-- Internal helper function to return the most recently added message in a queue. +CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) +RETURNS UUID AS $$ + SELECT COALESCE( + ( + SELECT id FROM mq_msgs + WHERE channel_name = from_channel_name + AND channel_args = from_channel_args + AND after_message_id IS NOT NULL + AND id != uuid_nil() + ORDER BY created_at DESC, id DESC + LIMIT 1 + ), + uuid_nil() + ) +$$ LANGUAGE SQL STABLE; + +-- Internal helper function to randomly select a set of channels with "ready" messages. +CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) +RETURNS TABLE(name TEXT, args TEXT) AS $$ + SELECT channel_name, channel_args + FROM mq_msgs + WHERE id != uuid_nil() + AND attempt_at <= NOW() + AND (channel_names IS NULL OR channel_name = ANY(channel_names)) + AND NOT mq_uuid_exists(after_message_id) + GROUP BY channel_name, channel_args + ORDER BY RANDOM() + LIMIT batch_size +$$ LANGUAGE SQL STABLE; + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT * FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; + +-- Creates new messages +CREATE FUNCTION mq_insert(new_messages mq_new_t[]) +RETURNS VOID AS $$ +BEGIN + PERFORM pg_notify(CONCAT('mq_', channel_name), '') + FROM unnest(new_messages) AS new_msgs + GROUP BY channel_name; + + IF FOUND THEN + PERFORM pg_notify('mq', ''); + END IF; + + INSERT INTO mq_payloads ( + id, + name, + payload_json, + payload_bytes + ) SELECT + id, + name, + payload_json::JSONB, + payload_bytes + FROM UNNEST(new_messages); + + INSERT INTO mq_msgs ( + id, + attempt_at, + attempts, + retry_backoff, + channel_name, + channel_args, + commit_interval, + after_message_id + ) + SELECT + id, + NOW() + delay + COALESCE(commit_interval, INTERVAL '0'), + retries + 1, + retry_backoff, + channel_name, + channel_args, + commit_interval, + CASE WHEN ordered + THEN + LAG(id, 1, mq_latest_message(channel_name, channel_args)) + OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id) + ELSE + NULL + END + FROM UNNEST(new_messages); +END; +$$ LANGUAGE plpgsql; + +-- Commits messages previously created with a non-NULL commit interval. +CREATE FUNCTION mq_commit(msg_ids UUID[]) +RETURNS VOID AS $$ +BEGIN + UPDATE mq_msgs + SET + attempt_at = attempt_at - commit_interval, + commit_interval = NULL + WHERE id = ANY(msg_ids) + AND commit_interval IS NOT NULL; +END; +$$ LANGUAGE plpgsql; + + +-- Deletes messages from the queue. This occurs when a message has been +-- processed, or when it expires without being processed. +CREATE FUNCTION mq_delete(msg_ids UUID[]) +RETURNS VOID AS $$ +BEGIN + PERFORM pg_notify(CONCAT('mq_', channel_name), '') + FROM mq_msgs + WHERE id = ANY(msg_ids) + AND after_message_id = uuid_nil() + GROUP BY channel_name; + + IF FOUND THEN + PERFORM pg_notify('mq', ''); + END IF; + + DELETE FROM mq_msgs WHERE id = ANY(msg_ids); + DELETE FROM mq_payloads WHERE id = ANY(msg_ids); +END; +$$ LANGUAGE plpgsql; + + +-- Can be called during the initial commit interval, or when processing +-- a message. Indicates that the caller is still active and will prevent either +-- the commit interval elapsing or the message being retried for the specified +-- interval. +CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL) +RETURNS VOID AS $$ + UPDATE mq_msgs + SET + attempt_at = NOW() + duration, + commit_interval = commit_interval + ((NOW() + duration) - attempt_at) + WHERE id = ANY(msg_ids) + AND attempt_at < NOW() + duration; +$$ LANGUAGE SQL; + + +-- Called during lengthy processing of a message to checkpoint the progress. +-- As well as behaving like `mq_keep_alive`, the message payload can be +-- updated. +CREATE FUNCTION mq_checkpoint( + msg_id UUID, + duration INTERVAL, + new_payload_json TEXT, + new_payload_bytes BYTEA, + extra_retries INT +) +RETURNS VOID AS $$ + UPDATE mq_msgs + SET + attempt_at = GREATEST(attempt_at, NOW() + duration), + attempts = attempts + COALESCE(extra_retries, 0) + WHERE id = msg_id; + + UPDATE mq_payloads + SET + payload_json = COALESCE(new_payload_json::JSONB, payload_json), + payload_bytes = COALESCE(new_payload_bytes, payload_bytes) + WHERE + id = msg_id; +$$ LANGUAGE SQL; + diff --git a/core/notifications/migrations/20210921115907_clear.down.sql b/core/notifications/migrations/20210921115907_clear.down.sql new file mode 100644 index 0000000000..e15638db2c --- /dev/null +++ b/core/notifications/migrations/20210921115907_clear.down.sql @@ -0,0 +1,2 @@ +DROP FUNCTION mq_clear; +DROP FUNCTION mq_clear_all; diff --git a/core/notifications/migrations/20210921115907_clear.up.sql b/core/notifications/migrations/20210921115907_clear.up.sql new file mode 100644 index 0000000000..bd1c1f6078 --- /dev/null +++ b/core/notifications/migrations/20210921115907_clear.up.sql @@ -0,0 +1,21 @@ +-- Deletes all messages from a list of channel names. +CREATE FUNCTION mq_clear(channel_names TEXT[]) +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; + +-- Deletes all messages. +CREATE FUNCTION mq_clear_all() +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; diff --git a/core/notifications/migrations/20211013151757_fix_mq_latest_message.down.sql b/core/notifications/migrations/20211013151757_fix_mq_latest_message.down.sql new file mode 100644 index 0000000000..d09bd4afd6 --- /dev/null +++ b/core/notifications/migrations/20211013151757_fix_mq_latest_message.down.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) +RETURNS UUID AS $$ + SELECT COALESCE( + ( + SELECT id FROM mq_msgs + WHERE channel_name = from_channel_name + AND channel_args = from_channel_args + AND after_message_id IS NOT NULL + AND id != uuid_nil() + ORDER BY created_at DESC, id DESC + LIMIT 1 + ), + uuid_nil() + ) +$$ LANGUAGE SQL STABLE; diff --git a/core/notifications/migrations/20211013151757_fix_mq_latest_message.up.sql b/core/notifications/migrations/20211013151757_fix_mq_latest_message.up.sql new file mode 100644 index 0000000000..b987c5e1e8 --- /dev/null +++ b/core/notifications/migrations/20211013151757_fix_mq_latest_message.up.sql @@ -0,0 +1,19 @@ +CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) +RETURNS UUID AS $$ + SELECT COALESCE( + ( + SELECT id FROM mq_msgs + WHERE channel_name = from_channel_name + AND channel_args = from_channel_args + AND after_message_id IS NOT NULL + AND id != uuid_nil() + AND NOT EXISTS( + SELECT * FROM mq_msgs AS mq_msgs2 + WHERE mq_msgs2.after_message_id = mq_msgs.id + ) + ORDER BY created_at DESC + LIMIT 1 + ), + uuid_nil() + ) +$$ LANGUAGE SQL STABLE; \ No newline at end of file diff --git a/core/notifications/migrations/20220208120856_fix_concurrent_poll.down.sql b/core/notifications/migrations/20220208120856_fix_concurrent_poll.down.sql new file mode 100644 index 0000000000..6cd2d21ebb --- /dev/null +++ b/core/notifications/migrations/20220208120856_fix_concurrent_poll.down.sql @@ -0,0 +1,60 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT * FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/core/notifications/migrations/20220208120856_fix_concurrent_poll.up.sql b/core/notifications/migrations/20220208120856_fix_concurrent_poll.up.sql new file mode 100644 index 0000000000..cae6151ed1 --- /dev/null +++ b/core/notifications/migrations/20220208120856_fix_concurrent_poll.up.sql @@ -0,0 +1,62 @@ + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql b/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql new file mode 100644 index 0000000000..d2f607c5b8 --- /dev/null +++ b/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.down.sql @@ -0,0 +1 @@ +-- Add down migration script here diff --git a/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql b/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql new file mode 100644 index 0000000000..4dd1f0b3a6 --- /dev/null +++ b/core/notifications/migrations/20220713122907_fix-clear_all-keep-nil-message.up.sql @@ -0,0 +1,29 @@ +CREATE OR REPLACE FUNCTION mq_clear(channel_names TEXT[]) +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs + WHERE channel_name = ANY(channel_names) + AND id != uuid_nil() + RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; +COMMENT ON FUNCTION mq_clear IS + 'Deletes all messages with corresponding payloads from a list of channel names'; + + +CREATE OR REPLACE FUNCTION mq_clear_all() +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs + WHERE id != uuid_nil() + RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; +COMMENT ON FUNCTION mq_clear_all IS + 'Deletes all messages with corresponding payloads'; diff --git a/lib/job-executor-rs/.env b/lib/job-executor-rs/.env new file mode 100644 index 0000000000..0b160180cb --- /dev/null +++ b/lib/job-executor-rs/.env @@ -0,0 +1 @@ +DATABASE_URL=postgres://user:password@localhost:5433/pg diff --git a/lib/job-executor-rs/.sqlx/query-7bc0ab72b1fe05b1f06d1df8af1b4a2db342927ea97f19a44c145f24a44012a3.json b/lib/job-executor-rs/.sqlx/query-7bc0ab72b1fe05b1f06d1df8af1b4a2db342927ea97f19a44c145f24a44012a3.json new file mode 100644 index 0000000000..8189035647 --- /dev/null +++ b/lib/job-executor-rs/.sqlx/query-7bc0ab72b1fe05b1f06d1df8af1b4a2db342927ea97f19a44c145f24a44012a3.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE mq_msgs SET retry_backoff = $1 WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Interval", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "7bc0ab72b1fe05b1f06d1df8af1b4a2db342927ea97f19a44c145f24a44012a3" +} diff --git a/lib/job-executor-rs/BUCK b/lib/job-executor-rs/BUCK new file mode 100644 index 0000000000..d20688dee3 --- /dev/null +++ b/lib/job-executor-rs/BUCK @@ -0,0 +1,25 @@ +load("@toolchains//rust:macros.bzl", "galoy_rust_lib") + +galoy_rust_lib( + name = "job-executor", + deps = [ + "//lib/tracing-rs:tracing", + "//third-party/rust:derive_builder", + "//third-party/rust:serde", + "//third-party/rust:serde_json", + "//third-party/rust:serde_with", + "//third-party/rust:sqlx", + "//third-party/rust:sqlxmq", + "//third-party/rust:tokio", + ], + srcs = glob([ + "src/**/*.rs", + ".sqlx/*" + ]), + env = { + "CARGO_MANIFEST_DIR": ".", + "SQLX_OFFLINE": "true", + "CARGO_PKG_NAME": "job-executor" , + "CARGO_PKG_VERSION": "0.1.0" , + }, +) diff --git a/lib/job-executor-rs/Cargo.toml b/lib/job-executor-rs/Cargo.toml new file mode 100644 index 0000000000..dd615cae31 --- /dev/null +++ b/lib/job-executor-rs/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "job-executor" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tracing = { path = "../../lib/tracing-rs" } + +derive_builder = { workspace = true } +serde = { workspace = true } +serde_with = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } +sqlxmq = { workspace = true } +tokio = { workspace = true } diff --git a/lib/job-executor-rs/src/lib.rs b/lib/job-executor-rs/src/lib.rs new file mode 100644 index 0000000000..8405a5b2a7 --- /dev/null +++ b/lib/job-executor-rs/src/lib.rs @@ -0,0 +1,264 @@ +use derive_builder::Builder; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use sqlxmq::CurrentJob; +use std::{collections::HashMap, time::Duration}; +use tokio::task::JoinHandle; +use tracing::{extract_tracing_data, inject_tracing_data, instrument, Span}; + +pub trait JobExecutionError: std::fmt::Display + From {} + +pub struct KeepAliveHandle(Option>); +impl KeepAliveHandle { + pub fn new(inner: JoinHandle<()>) -> Self { + Self(Some(inner)) + } + pub fn into_inner(mut self) -> JoinHandle<()> { + self.0.take().expect("Only consumed once") + } + pub async fn stop(self) { + let handle = self.into_inner(); + handle.abort(); + let _ = handle.await; + } +} +impl Drop for KeepAliveHandle { + fn drop(&mut self) { + if let Some(handle) = self.0.take() { + handle.abort(); + } + } +} + +#[derive(Builder)] +#[builder(pattern = "owned")] +pub struct JobExecutor<'a> { + job: &'a mut CurrentJob, + #[builder(default = "4")] + warn_retries: u32, + #[builder(default = "5")] + max_attempts: u32, + #[builder(default = "Duration::from_secs(1)")] + initial_retry_delay: Duration, + #[builder(default = "Duration::from_secs(60)")] + max_retry_delay: Duration, +} + +impl<'a> JobExecutor<'a> { + pub fn builder(job: &'a mut CurrentJob) -> JobExecutorBuilder<'a> { + JobExecutorBuilder::default().job(job) + } + + #[instrument(name = "execute_job", skip_all, fields( + job_id, job_name, checkpoint_json, attempt, last_attempt, + error, error.level, error.message + ), err)] + pub async fn execute(mut self, func: F) -> Result + where + T: DeserializeOwned + Serialize, + E: JobExecutionError, + R: std::future::Future>, + F: FnOnce(Option) -> R, + { + let mut data = JobData::::from_raw_payload(self.job.raw_json()).unwrap(); + let keep_alive_handle = self.spawn_keep_alive(data.job_meta.wait_till_next_attempt); + + let completed = self.checkpoint_attempt(&mut data).await?; + let result = func(data.data).await; + + keep_alive_handle.stop().await; + + if let Err(ref e) = result { + self.handle_error(data.job_meta, e).await; + } else if !completed { + self.job.complete().await?; + } + result + } + + fn spawn_keep_alive(&self, mut interval: Duration) -> KeepAliveHandle { + let pool = self.job.pool().clone(); + let id = self.job.id(); + let max_interval = self.max_retry_delay; + let handle = tokio::spawn(async move { + loop { + tokio::time::sleep(interval / 2).await; + interval = max_interval.min(interval * 2); + if let Err(e) = sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)") + .bind(id) + .bind(interval) + .execute(&pool) + .await + { + tracing::error!("Failed to keep job {id} alive: {e}"); + break; + } + } + }); + KeepAliveHandle::new(handle) + } + + async fn handle_error(&mut self, meta: JobMeta, error: &E) { + Span::current().record("error", &tracing::field::display("true")); + Span::current().record("error.message", &tracing::field::display(&error)); + if meta.attempts <= self.warn_retries { + Span::current().record( + "error.level", + &tracing::field::display(tracing::Level::WARN), + ); + } else { + Span::current().record( + "error.level", + &tracing::field::display(tracing::Level::ERROR), + ); + } + } + + async fn checkpoint_attempt( + &mut self, + data: &mut JobData, + ) -> Result { + let span = Span::current(); + + if let Some(tracing_data) = data.job_meta.tracing_data.as_ref() { + inject_tracing_data(&span, tracing_data); + } else { + inject_tracing_data(&span, &data.tracing_data); + } + if data.job_meta.attempts == 0 { + data.job_meta.wait_till_next_attempt = self.initial_retry_delay; + } + + data.job_meta.attempts += 1; + data.job_meta.tracing_data = Some(extract_tracing_data()); + + span.record("job_id", &tracing::field::display(self.job.id())); + span.record("job_name", &tracing::field::display(self.job.name())); + span.record("attempt", &tracing::field::display(data.job_meta.attempts)); + span.record( + "checkpoint_json", + &tracing::field::display(serde_json::to_string(&data).expect("Couldn't checkpoint")), + ); + + let mut checkpoint = + sqlxmq::Checkpoint::new_keep_alive(data.job_meta.wait_till_next_attempt); + + data.job_meta.wait_till_next_attempt = self + .max_retry_delay + .min(data.job_meta.wait_till_next_attempt * 2); + if data.job_meta.attempts < self.max_attempts { + checkpoint.set_extra_retries(1); + } + + checkpoint.set_json(&data).expect("Couldn't update tracker"); + let mut tx = self.job.pool().begin().await?; + if let Ok(interval) = + sqlx::postgres::types::PgInterval::try_from(data.job_meta.wait_till_next_attempt) + { + sqlx::query!( + "UPDATE mq_msgs SET retry_backoff = $1 WHERE id = $2", + interval, + self.job.id() + ) + .execute(&mut *tx) + .await?; + } + self.job + .checkpoint_with_transaction(tx, &checkpoint) + .await?; + + if data.job_meta.attempts >= self.max_attempts { + span.record("last_attempt", &tracing::field::display(true)); + self.job.complete().await?; + Ok(true) + } else { + span.record("last_attempt", &tracing::field::display(false)); + Ok(false) + } + } +} + +#[derive(Deserialize, Serialize)] +struct JobData { + #[serde(rename = "_job_meta", default)] + job_meta: JobMeta, + #[serde(flatten)] + data: Option, + #[serde(flatten)] + tracing_data: HashMap, +} + +impl<'a, T: Deserialize<'a>> JobData { + pub fn from_raw_payload(payload: Option<&'a str>) -> Result { + if let Some(payload) = payload { + serde_json::from_str(payload) + } else { + Ok(Self { + job_meta: JobMeta::default(), + data: None, + tracing_data: HashMap::new(), + }) + } + } +} + +#[serde_with::serde_as] +#[derive(Serialize, Deserialize)] +struct JobMeta { + attempts: u32, + #[serde_as(as = "serde_with::DurationSeconds")] + wait_till_next_attempt: Duration, + tracing_data: Option>, +} +impl Default for JobMeta { + fn default() -> Self { + Self { + attempts: 0, + wait_till_next_attempt: Duration::from_secs(1), + tracing_data: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Deserialize)] + struct DummyData { + value: String, + } + + #[test] + fn from_raw() { + let json = r#"{ + "_job_meta": { + "attempts": 1, + "wait_till_next_attempt": 1 + } + }"#; + let job_data: JobData = JobData::from_raw_payload(Some(json)).unwrap(); + assert!(job_data.job_meta.attempts == 1); + assert!(job_data.data.is_none()); + assert!(job_data.tracing_data.is_empty()); + + let json = r#"{ + "value": "test" + }"#; + let job_data: JobData = JobData::from_raw_payload(Some(json)).unwrap(); + assert!(job_data.job_meta.attempts == 0); + assert_eq!(job_data.data.unwrap().value, "test"); + assert!(job_data.tracing_data.is_empty()); + + let json = r#"{ + "_job_meta": { + "attempts": 2, + "wait_till_next_attempt": 1 + }, + "header": "value" + }"#; + let job_data: JobData = JobData::from_raw_payload(Some(json)).unwrap(); + assert!(job_data.job_meta.attempts == 2); + assert!(job_data.data.is_none()); + assert_eq!(job_data.tracing_data.get("header").unwrap(), "value"); + } +} diff --git a/lib/job-executor-rs/src/main.rs b/lib/job-executor-rs/src/main.rs new file mode 100644 index 0000000000..e7a11a969c --- /dev/null +++ b/lib/job-executor-rs/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/lib/tracing-rs/src/lib.rs b/lib/tracing-rs/src/lib.rs index f4fb26c2ca..9850b1111d 100644 --- a/lib/tracing-rs/src/lib.rs +++ b/lib/tracing-rs/src/lib.rs @@ -1,5 +1,6 @@ use opentelemetry::{ global, + propagation::TextMapPropagator, sdk::{ propagation::TraceContextPropagator, resource::{EnvResourceDetector, OsResourceDetector, ProcessResourceDetector}, @@ -9,11 +10,12 @@ use opentelemetry::{ use opentelemetry_otlp::WithExportConfig; use opentelemetry_semantic_conventions::resource; use serde::{Deserialize, Serialize}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::{filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; pub use tracing::*; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TracingConfig { @@ -65,13 +67,30 @@ fn telemetry_resource(config: &TracingConfig) -> Resource { ])) } -pub fn extract_tracing(headers: &http::HeaderMap) { - use opentelemetry_http::HeaderExtractor; - use tracing_opentelemetry::OpenTelemetrySpanExt; - let extractor = HeaderExtractor(headers); - let ctx = - opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)); - tracing::Span::current().set_parent(ctx) +pub fn extract_tracing_data() -> HashMap { + let mut tracing_data = HashMap::new(); + let propagator = TraceContextPropagator::new(); + let context = Span::current().context(); + propagator.inject_context(&context, &mut tracing_data); + tracing_data +} + +pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap) { + let propagator = TraceContextPropagator::new(); + let context = propagator.extract(tracing_data); + span.set_parent(context); +} + +pub mod http { + pub fn extract_tracing(headers: &http::HeaderMap) { + use opentelemetry_http::HeaderExtractor; + use tracing_opentelemetry::OpenTelemetrySpanExt; + let extractor = HeaderExtractor(headers); + let ctx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor) + }); + tracing::Span::current().set_parent(ctx) + } } pub mod grpc { diff --git a/third-party/rust/BUCK b/third-party/rust/BUCK index 79a8a88e9d..4b83516a09 100644 --- a/third-party/rust/BUCK +++ b/third-party/rust/BUCK @@ -278,6 +278,23 @@ cargo.rust_library( visibility = [], ) +http_archive( + name = "anymap2-0.13.0.crate", + sha256 = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c", + strip_prefix = "anymap2-0.13.0", + urls = ["https://crates.io/api/v1/crates/anymap2/0.13.0/download"], + visibility = [], +) + +cargo.rust_library( + name = "anymap2-0.13.0", + srcs = [":anymap2-0.13.0.crate"], + crate = "anymap2", + crate_root = "anymap2-0.13.0.crate/src/lib.rs", + edition = "2018", + visibility = [], +) + http_archive( name = "arc-swap-1.6.0.crate", sha256 = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6", @@ -334,7 +351,7 @@ cargo.rust_library( ":async-trait-0.1.74", ":base64-0.13.1", ":bytes-1.5.0", - ":chrono-0.4.32", + ":chrono-0.4.34", ":fnv-1.0.7", ":futures-util-0.3.30", ":http-0.2.11", @@ -382,7 +399,7 @@ cargo.rust_library( ":bytes-1.5.0", ":futures-util-0.3.30", ":serde_json-1.0.111", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ":tokio-util-0.7.10", ":tower-service-0.3.2", @@ -630,7 +647,7 @@ cargo.rust_library( ":serde_urlencoded-0.7.1", ":sha1-0.10.6", ":sync_wrapper-0.1.2", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-tungstenite-0.20.1", ":tower-0.4.13", ":tower-layer-0.3.2", @@ -990,32 +1007,37 @@ cargo.rust_library( alias( name = "chrono", - actual = ":chrono-0.4.32", + actual = ":chrono-0.4.34", visibility = ["PUBLIC"], ) http_archive( - name = "chrono-0.4.32.crate", - sha256 = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a", - strip_prefix = "chrono-0.4.32", - urls = ["https://crates.io/api/v1/crates/chrono/0.4.32/download"], + name = "chrono-0.4.34.crate", + sha256 = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b", + strip_prefix = "chrono-0.4.34", + urls = ["https://crates.io/api/v1/crates/chrono/0.4.34/download"], visibility = [], ) cargo.rust_library( - name = "chrono-0.4.32", - srcs = [":chrono-0.4.32.crate"], + name = "chrono-0.4.34", + srcs = [":chrono-0.4.34.crate"], crate = "chrono", - crate_root = "chrono-0.4.32.crate/src/lib.rs", + crate_root = "chrono-0.4.34.crate/src/lib.rs", edition = "2021", features = [ "alloc", "android-tzdata", "clock", + "default", "iana-time-zone", + "js-sys", "now", + "oldtime", "serde", "std", + "wasm-bindgen", + "wasmbind", "winapi", "windows-targets", ], @@ -1048,23 +1070,23 @@ cargo.rust_library( alias( name = "clap", - actual = ":clap-4.4.18", + actual = ":clap-4.5.0", visibility = ["PUBLIC"], ) http_archive( - name = "clap-4.4.18.crate", - sha256 = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c", - strip_prefix = "clap-4.4.18", - urls = ["https://crates.io/api/v1/crates/clap/4.4.18/download"], + name = "clap-4.5.0.crate", + sha256 = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f", + strip_prefix = "clap-4.5.0", + urls = ["https://crates.io/api/v1/crates/clap/4.5.0/download"], visibility = [], ) cargo.rust_library( - name = "clap-4.4.18", - srcs = [":clap-4.4.18.crate"], + name = "clap-4.5.0", + srcs = [":clap-4.5.0.crate"], crate = "clap", - crate_root = "clap-4.4.18.crate/src/lib.rs", + crate_root = "clap-4.5.0.crate/src/lib.rs", edition = "2021", features = [ "color", @@ -1079,24 +1101,24 @@ cargo.rust_library( ], visibility = [], deps = [ - ":clap_builder-4.4.18", - ":clap_derive-4.4.7", + ":clap_builder-4.5.0", + ":clap_derive-4.5.0", ], ) http_archive( - name = "clap_builder-4.4.18.crate", - sha256 = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7", - strip_prefix = "clap_builder-4.4.18", - urls = ["https://crates.io/api/v1/crates/clap_builder/4.4.18/download"], + name = "clap_builder-4.5.0.crate", + sha256 = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99", + strip_prefix = "clap_builder-4.5.0", + urls = ["https://crates.io/api/v1/crates/clap_builder/4.5.0/download"], visibility = [], ) cargo.rust_library( - name = "clap_builder-4.4.18", - srcs = [":clap_builder-4.4.18.crate"], + name = "clap_builder-4.5.0", + srcs = [":clap_builder-4.5.0.crate"], crate = "clap_builder", - crate_root = "clap_builder-4.4.18.crate/src/lib.rs", + crate_root = "clap_builder-4.5.0.crate/src/lib.rs", edition = "2021", features = [ "color", @@ -1111,24 +1133,24 @@ cargo.rust_library( deps = [ ":anstream-0.6.8", ":anstyle-1.0.4", - ":clap_lex-0.6.0", - ":strsim-0.10.0", + ":clap_lex-0.7.0", + ":strsim-0.11.0", ], ) http_archive( - name = "clap_derive-4.4.7.crate", - sha256 = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442", - strip_prefix = "clap_derive-4.4.7", - urls = ["https://crates.io/api/v1/crates/clap_derive/4.4.7/download"], + name = "clap_derive-4.5.0.crate", + sha256 = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47", + strip_prefix = "clap_derive-4.5.0", + urls = ["https://crates.io/api/v1/crates/clap_derive/4.5.0/download"], visibility = [], ) cargo.rust_library( - name = "clap_derive-4.4.7", - srcs = [":clap_derive-4.4.7.crate"], + name = "clap_derive-4.5.0", + srcs = [":clap_derive-4.5.0.crate"], crate = "clap_derive", - crate_root = "clap_derive-4.4.7.crate/src/lib.rs", + crate_root = "clap_derive-4.5.0.crate/src/lib.rs", edition = "2021", features = ["default"], proc_macro = True, @@ -1142,18 +1164,18 @@ cargo.rust_library( ) http_archive( - name = "clap_lex-0.6.0.crate", - sha256 = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1", - strip_prefix = "clap_lex-0.6.0", - urls = ["https://crates.io/api/v1/crates/clap_lex/0.6.0/download"], + name = "clap_lex-0.7.0.crate", + sha256 = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce", + strip_prefix = "clap_lex-0.7.0", + urls = ["https://crates.io/api/v1/crates/clap_lex/0.7.0/download"], visibility = [], ) cargo.rust_library( - name = "clap_lex-0.6.0", - srcs = [":clap_lex-0.6.0.crate"], + name = "clap_lex-0.7.0", + srcs = [":clap_lex-0.7.0.crate"], crate = "clap_lex", - crate_root = "clap_lex-0.6.0.crate/src/lib.rs", + crate_root = "clap_lex-0.7.0.crate/src/lib.rs", edition = "2021", visibility = [], ) @@ -1810,46 +1832,47 @@ cargo.rust_library( alias( name = "derive_builder", - actual = ":derive_builder-0.12.0", + actual = ":derive_builder-0.13.0", visibility = ["PUBLIC"], ) http_archive( - name = "derive_builder-0.12.0.crate", - sha256 = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8", - strip_prefix = "derive_builder-0.12.0", - urls = ["https://crates.io/api/v1/crates/derive_builder/0.12.0/download"], + name = "derive_builder-0.13.0.crate", + sha256 = "660047478bc508c0fde22c868991eec0c40a63e48d610befef466d48e2bee574", + strip_prefix = "derive_builder-0.13.0", + urls = ["https://crates.io/api/v1/crates/derive_builder/0.13.0/download"], visibility = [], ) cargo.rust_library( - name = "derive_builder-0.12.0", - srcs = [":derive_builder-0.12.0.crate"], + name = "derive_builder-0.13.0", + srcs = [":derive_builder-0.13.0.crate"], crate = "derive_builder", - crate_root = "derive_builder-0.12.0.crate/src/lib.rs", + crate_root = "derive_builder-0.13.0.crate/src/lib.rs", edition = "2015", features = [ "default", "std", ], visibility = [], - deps = [":derive_builder_macro-0.12.0"], + deps = [":derive_builder_macro-0.13.0"], ) http_archive( - name = "derive_builder_core-0.12.0.crate", - sha256 = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f", - strip_prefix = "derive_builder_core-0.12.0", - urls = ["https://crates.io/api/v1/crates/derive_builder_core/0.12.0/download"], + name = "derive_builder_core-0.13.0.crate", + sha256 = "9b217e6dd1011a54d12f3b920a411b5abd44b1716ecfe94f5f2f2f7b52e08ab7", + strip_prefix = "derive_builder_core-0.13.0", + urls = ["https://crates.io/api/v1/crates/derive_builder_core/0.13.0/download"], visibility = [], ) cargo.rust_library( - name = "derive_builder_core-0.12.0", - srcs = [":derive_builder_core-0.12.0.crate"], + name = "derive_builder_core-0.13.0", + srcs = [":derive_builder_core-0.13.0.crate"], crate = "derive_builder_core", - crate_root = "derive_builder_core-0.12.0.crate/src/lib.rs", + crate_root = "derive_builder_core-0.13.0.crate/src/lib.rs", edition = "2015", + features = ["lib_has_std"], visibility = [], deps = [ ":darling-0.14.4", @@ -1860,23 +1883,24 @@ cargo.rust_library( ) http_archive( - name = "derive_builder_macro-0.12.0.crate", - sha256 = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e", - strip_prefix = "derive_builder_macro-0.12.0", - urls = ["https://crates.io/api/v1/crates/derive_builder_macro/0.12.0/download"], + name = "derive_builder_macro-0.13.0.crate", + sha256 = "7a5f77d7e20ac9153428f7ca14a88aba652adfc7a0ef0a06d654386310ef663b", + strip_prefix = "derive_builder_macro-0.13.0", + urls = ["https://crates.io/api/v1/crates/derive_builder_macro/0.13.0/download"], visibility = [], ) cargo.rust_library( - name = "derive_builder_macro-0.12.0", - srcs = [":derive_builder_macro-0.12.0.crate"], + name = "derive_builder_macro-0.13.0", + srcs = [":derive_builder_macro-0.13.0.crate"], crate = "derive_builder_macro", - crate_root = "derive_builder_macro-0.12.0.crate/src/lib.rs", + crate_root = "derive_builder_macro-0.13.0.crate/src/lib.rs", edition = "2015", + features = ["lib_has_std"], proc_macro = True, visibility = [], deps = [ - ":derive_builder_core-0.12.0", + ":derive_builder_core-0.13.0", ":syn-1.0.109", ], ) @@ -1967,6 +1991,23 @@ cargo.rust_library( ], ) +http_archive( + name = "dotenv-0.15.0.crate", + sha256 = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f", + strip_prefix = "dotenv-0.15.0", + urls = ["https://crates.io/api/v1/crates/dotenv/0.15.0/download"], + visibility = [], +) + +cargo.rust_library( + name = "dotenv-0.15.0", + srcs = [":dotenv-0.15.0.crate"], + crate = "dotenv", + crate_root = "dotenv-0.15.0.crate/src/lib.rs", + edition = "2018", + visibility = [], +) + http_archive( name = "dotenvy-0.15.7.crate", sha256 = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b", @@ -2768,7 +2809,7 @@ cargo.rust_library( visibility = [], deps = [ ":base64-0.13.1", - ":chrono-0.4.32", + ":chrono-0.4.34", ":http-0.2.11", ":hyper-0.14.27", ":itertools-0.10.5", @@ -2776,7 +2817,7 @@ cargo.rust_library( ":serde-1.0.196", ":serde_json-1.0.111", ":serde_with-2.3.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tower-service-0.3.2", ":url-1.7.2", ":yup-oauth2-8.3.2", @@ -2818,7 +2859,7 @@ cargo.rust_library( ":mime-0.3.17", ":serde-1.0.196", ":serde_json-1.0.111", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tower-service-0.3.2", ":url-1.7.2", ], @@ -2848,7 +2889,7 @@ cargo.rust_library( ":http-0.2.11", ":indexmap-2.2.1", ":slab-0.4.9", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-util-0.7.10", ":tracing-0.1.40", ], @@ -3236,7 +3277,7 @@ cargo.rust_library( ":itoa-1.0.9", ":pin-project-lite-0.2.13", ":socket2-0.4.10", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tower-service-0.3.2", ":tracing-0.1.40", ":want-0.3.1", @@ -3277,7 +3318,7 @@ cargo.rust_library( ":log-0.4.20", ":rustls-0.21.8", ":rustls-native-certs-0.6.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ], ) @@ -3300,7 +3341,7 @@ cargo.rust_library( deps = [ ":hyper-0.14.27", ":pin-project-lite-0.2.13", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-io-timeout-1.2.0", ], ) @@ -4198,7 +4239,7 @@ cargo.rust_library( ":base64-0.13.1", ":bitflags-1.3.2", ":bson-2.8.1", - ":chrono-0.4.32", + ":chrono-0.4.34", ":derivative-2.2.0", ":derive_more-0.99.17", ":futures-core-0.3.30", @@ -4225,7 +4266,7 @@ cargo.rust_library( ":strsim-0.10.0", ":take_mut-0.2.2", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":tokio-util-0.7.10", ":trust-dns-proto-0.21.2", @@ -4760,7 +4801,7 @@ cargo.rust_library( ":bytes-1.5.0", ":http-0.2.11", ":opentelemetry_api-0.20.0", - ":reqwest-0.11.23", + ":reqwest-0.11.24", ], ) @@ -4820,9 +4861,9 @@ cargo.rust_library( ":opentelemetry_api-0.20.0", ":opentelemetry_sdk-0.20.0", ":prost-0.11.9", - ":reqwest-0.11.23", + ":reqwest-0.11.24", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tonic-0.9.2", ], ) @@ -4970,7 +5011,7 @@ cargo.rust_library( ":regex-1.10.2", ":serde_json-1.0.111", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ], ) @@ -6007,23 +6048,23 @@ cargo.rust_library( alias( name = "reqwest", - actual = ":reqwest-0.11.23", + actual = ":reqwest-0.11.24", visibility = ["PUBLIC"], ) http_archive( - name = "reqwest-0.11.23.crate", - sha256 = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41", - strip_prefix = "reqwest-0.11.23", - urls = ["https://crates.io/api/v1/crates/reqwest/0.11.23/download"], + name = "reqwest-0.11.24.crate", + sha256 = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251", + strip_prefix = "reqwest-0.11.24", + urls = ["https://crates.io/api/v1/crates/reqwest/0.11.24/download"], visibility = [], ) cargo.rust_library( - name = "reqwest-0.11.23", - srcs = [":reqwest-0.11.23.crate"], + name = "reqwest-0.11.24", + srcs = [":reqwest-0.11.24.crate"], crate = "reqwest", - crate_root = "reqwest-0.11.23.crate/src/lib.rs", + crate_root = "reqwest-0.11.24.crate/src/lib.rs", edition = "2018", features = [ "__rustls", @@ -6032,7 +6073,6 @@ cargo.rust_library( "hyper-rustls", "json", "rustls", - "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", @@ -6055,7 +6095,7 @@ cargo.rust_library( ":pin-project-lite-0.2.13", ":rustls-0.21.8", ":rustls-pemfile-1.0.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ], @@ -6075,7 +6115,7 @@ cargo.rust_library( ":pin-project-lite-0.2.13", ":rustls-0.21.8", ":rustls-pemfile-1.0.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ], @@ -6096,7 +6136,7 @@ cargo.rust_library( ":rustls-0.21.8", ":rustls-pemfile-1.0.3", ":system-configuration-0.5.1", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ], @@ -6117,7 +6157,7 @@ cargo.rust_library( ":rustls-0.21.8", ":rustls-pemfile-1.0.3", ":system-configuration-0.5.1", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ], @@ -6137,7 +6177,7 @@ cargo.rust_library( ":pin-project-lite-0.2.13", ":rustls-0.21.8", ":rustls-pemfile-1.0.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ":winreg-0.50.0", @@ -6158,7 +6198,7 @@ cargo.rust_library( ":pin-project-lite-0.2.13", ":rustls-0.21.8", ":rustls-pemfile-1.0.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-rustls-0.24.1", ":webpki-roots-0.25.2", ":winreg-0.50.0", @@ -6175,6 +6215,7 @@ cargo.rust_library( ":serde-1.0.196", ":serde_json-1.0.111", ":serde_urlencoded-0.7.1", + ":sync_wrapper-0.1.2", ":tower-service-0.3.2", ":url-2.4.1", ], @@ -7775,7 +7816,7 @@ cargo.rust_library( "std", ], named_deps = { - "chrono_0_4": ":chrono-0.4.32", + "chrono_0_4": ":chrono-0.4.34", "indexmap_1": ":indexmap-1.9.3", "time_0_3": ":time-0.3.30", }, @@ -7791,23 +7832,23 @@ cargo.rust_library( alias( name = "serde_with", - actual = ":serde_with-3.5.0", + actual = ":serde_with-3.6.1", visibility = ["PUBLIC"], ) http_archive( - name = "serde_with-3.5.0.crate", - sha256 = "f58c3a1b3e418f61c25b2aeb43fc6c95eaa252b8cecdda67f401943e9e08d33f", - strip_prefix = "serde_with-3.5.0", - urls = ["https://crates.io/api/v1/crates/serde_with/3.5.0/download"], + name = "serde_with-3.6.1.crate", + sha256 = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270", + strip_prefix = "serde_with-3.6.1", + urls = ["https://crates.io/api/v1/crates/serde_with/3.6.1/download"], visibility = [], ) cargo.rust_library( - name = "serde_with-3.5.0", - srcs = [":serde_with-3.5.0.crate"], + name = "serde_with-3.6.1", + srcs = [":serde_with-3.6.1.crate"], crate = "serde_with", - crate_root = "serde_with-3.5.0.crate/src/lib.rs", + crate_root = "serde_with-3.6.1.crate/src/lib.rs", edition = "2021", features = [ "alloc", @@ -7816,7 +7857,7 @@ cargo.rust_library( "std", ], named_deps = { - "chrono_0_4": ":chrono-0.4.32", + "chrono_0_4": ":chrono-0.4.34", "indexmap_1": ":indexmap-1.9.3", "indexmap_2": ":indexmap-2.2.1", "time_0_3": ":time-0.3.30", @@ -7826,8 +7867,9 @@ cargo.rust_library( ":base64-0.21.5", ":hex-0.4.3", ":serde-1.0.196", + ":serde_derive-1.0.196", ":serde_json-1.0.111", - ":serde_with_macros-3.5.0", + ":serde_with_macros-3.6.1", ], ) @@ -7880,18 +7922,18 @@ cargo.rust_library( ) http_archive( - name = "serde_with_macros-3.5.0.crate", - sha256 = "d2068b437a31fc68f25dd7edc296b078f04b45145c199d8eed9866e45f1ff274", - strip_prefix = "serde_with_macros-3.5.0", - urls = ["https://crates.io/api/v1/crates/serde_with_macros/3.5.0/download"], + name = "serde_with_macros-3.6.1.crate", + sha256 = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d", + strip_prefix = "serde_with_macros-3.6.1", + urls = ["https://crates.io/api/v1/crates/serde_with_macros/3.6.1/download"], visibility = [], ) cargo.rust_library( - name = "serde_with_macros-3.5.0", - srcs = [":serde_with_macros-3.5.0.crate"], + name = "serde_with_macros-3.6.1", + srcs = [":serde_with_macros-3.6.1.crate"], crate = "serde_with_macros", - crate_root = "serde_with_macros-3.5.0.crate/src/lib.rs", + crate_root = "serde_with_macros-3.6.1.crate/src/lib.rs", edition = "2021", proc_macro = True, visibility = [], @@ -8473,7 +8515,7 @@ cargo.rust_library( ":atoi-2.0.0", ":byteorder-1.5.0", ":bytes-1.5.0", - ":chrono-0.4.32", + ":chrono-0.4.34", ":crc-3.0.1", ":crossbeam-queue-0.3.8", ":dotenvy-0.15.7", @@ -8500,7 +8542,7 @@ cargo.rust_library( ":smallvec-1.13.1", ":sqlformat-0.2.2", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ":tracing-0.1.40", ":url-2.4.1", @@ -8588,7 +8630,7 @@ cargo.rust_library( ":sqlx-sqlite-0.7.2", ":syn-1.0.109", ":tempfile-3.8.1", - ":tokio-1.33.0", + ":tokio-1.36.0", ":url-2.4.1", ], ) @@ -8623,7 +8665,7 @@ cargo.rust_library( ":bitflags-2.4.1", ":byteorder-1.5.0", ":bytes-1.5.0", - ":chrono-0.4.32", + ":chrono-0.4.34", ":crc-3.0.1", ":digest-0.10.7", ":dotenvy-0.15.7", @@ -8693,7 +8735,7 @@ cargo.rust_library( ":base64-0.21.5", ":bitflags-2.4.1", ":byteorder-1.5.0", - ":chrono-0.4.32", + ":chrono-0.4.34", ":crc-3.0.1", ":dotenvy-0.15.7", ":futures-channel-0.3.30", @@ -8750,7 +8792,7 @@ cargo.rust_library( visibility = [], deps = [ ":atoi-2.0.0", - ":chrono-0.4.32", + ":chrono-0.4.34", ":flume-0.11.0", ":futures-channel-0.3.30", ":futures-core-0.3.30", @@ -8768,6 +8810,65 @@ cargo.rust_library( ], ) +alias( + name = "sqlxmq", + actual = ":sqlxmq-0.5.0", + visibility = ["PUBLIC"], +) + +http_archive( + name = "sqlxmq-0.5.0.crate", + sha256 = "e914521071581f0413516de0e7931086ebe4a22d719c1100de29eb339c712311", + strip_prefix = "sqlxmq-0.5.0", + urls = ["https://crates.io/api/v1/crates/sqlxmq/0.5.0/download"], + visibility = [], +) + +cargo.rust_library( + name = "sqlxmq-0.5.0", + srcs = [":sqlxmq-0.5.0.crate"], + crate = "sqlxmq", + crate_root = "sqlxmq-0.5.0.crate/src/lib.rs", + edition = "2018", + features = ["runtime-tokio-rustls"], + visibility = [], + deps = [ + ":anymap2-0.13.0", + ":chrono-0.4.34", + ":dotenv-0.15.0", + ":log-0.4.20", + ":serde-1.0.196", + ":serde_json-1.0.111", + ":sqlx-0.7.2", + ":sqlxmq_macros-0.5.0", + ":tokio-1.36.0", + ":uuid-1.7.0", + ], +) + +http_archive( + name = "sqlxmq_macros-0.5.0.crate", + sha256 = "bd00667edb120f18e14e2b4a71cddb308dc3605171ed7afda3056129d08b9f4f", + strip_prefix = "sqlxmq_macros-0.5.0", + urls = ["https://crates.io/api/v1/crates/sqlxmq_macros/0.5.0/download"], + visibility = [], +) + +cargo.rust_library( + name = "sqlxmq_macros-0.5.0", + srcs = [":sqlxmq_macros-0.5.0.crate"], + crate = "sqlxmq_macros", + crate_root = "sqlxmq_macros-0.5.0.crate/src/lib.rs", + edition = "2018", + proc_macro = True, + visibility = [], + deps = [ + ":proc-macro2-1.0.76", + ":quote-1.0.35", + ":syn-1.0.109", + ], +) + http_archive( name = "stable_deref_trait-1.2.0.crate", sha256 = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3", @@ -8841,6 +8942,23 @@ cargo.rust_library( visibility = [], ) +http_archive( + name = "strsim-0.11.0.crate", + sha256 = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01", + strip_prefix = "strsim-0.11.0", + urls = ["https://crates.io/api/v1/crates/strsim/0.11.0/download"], + visibility = [], +) + +cargo.rust_library( + name = "strsim-0.11.0", + srcs = [":strsim-0.11.0.crate"], + crate = "strsim", + crate_root = "strsim-0.11.0.crate/src/lib.rs", + edition = "2015", + visibility = [], +) + http_archive( name = "strum-0.25.0.crate", sha256 = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125", @@ -9123,9 +9241,9 @@ cargo.rust_binary( ":async-graphql-6.0.11", ":async-graphql-axum-6.0.11", ":axum-0.6.20", - ":chrono-0.4.32", - ":clap-4.4.18", - ":derive_builder-0.12.0", + ":chrono-0.4.34", + ":clap-4.5.0", + ":derive_builder-0.13.0", ":futures-0.3.30", ":google-fcm1-5.0.3+20230106", ":http-0.2.11", @@ -9137,15 +9255,16 @@ cargo.rust_binary( ":opentelemetry-semantic-conventions-0.12.0", ":prost-0.12.3", ":rand-0.8.5", - ":reqwest-0.11.23", + ":reqwest-0.11.24", ":rust-i18n-3.0.1", ":serde-1.0.196", ":serde_json-1.0.111", - ":serde_with-3.5.0", + ":serde_with-3.6.1", ":serde_yaml-0.9.31", ":sqlx-0.7.2", + ":sqlxmq-0.5.0", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tonic-0.10.2", ":tonic-build-0.10.2", ":tonic-health-0.10.2", @@ -9372,23 +9491,23 @@ cargo.rust_library( alias( name = "tokio", - actual = ":tokio-1.33.0", + actual = ":tokio-1.36.0", visibility = ["PUBLIC"], ) http_archive( - name = "tokio-1.33.0.crate", - sha256 = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653", - strip_prefix = "tokio-1.33.0", - urls = ["https://crates.io/api/v1/crates/tokio/1.33.0/download"], + name = "tokio-1.36.0.crate", + sha256 = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931", + strip_prefix = "tokio-1.36.0", + urls = ["https://crates.io/api/v1/crates/tokio/1.36.0/download"], visibility = [], ) cargo.rust_library( - name = "tokio-1.33.0", - srcs = [":tokio-1.33.0.crate"], + name = "tokio-1.36.0", + srcs = [":tokio-1.36.0.crate"], crate = "tokio", - crate_root = "tokio-1.33.0.crate/src/lib.rs", + crate_root = "tokio-1.36.0.crate/src/lib.rs", edition = "2021", features = [ "bytes", @@ -9463,7 +9582,7 @@ cargo.rust_library( ":num_cpus-1.16.0", ":parking_lot-0.12.1", ":pin-project-lite-0.2.13", - ":tokio-macros-2.1.0", + ":tokio-macros-2.2.0", ], ) @@ -9484,24 +9603,24 @@ cargo.rust_library( visibility = [], deps = [ ":pin-project-lite-0.2.13", - ":tokio-1.33.0", + ":tokio-1.36.0", ], ) http_archive( - name = "tokio-macros-2.1.0.crate", - sha256 = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e", - strip_prefix = "tokio-macros-2.1.0", - urls = ["https://crates.io/api/v1/crates/tokio-macros/2.1.0/download"], + name = "tokio-macros-2.2.0.crate", + sha256 = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b", + strip_prefix = "tokio-macros-2.2.0", + urls = ["https://crates.io/api/v1/crates/tokio-macros/2.2.0/download"], visibility = [], ) cargo.rust_library( - name = "tokio-macros-2.1.0", - srcs = [":tokio-macros-2.1.0.crate"], + name = "tokio-macros-2.2.0", + srcs = [":tokio-macros-2.2.0.crate"], crate = "tokio_macros", - crate_root = "tokio-macros-2.1.0.crate/src/lib.rs", - edition = "2018", + crate_root = "tokio-macros-2.2.0.crate/src/lib.rs", + edition = "2021", proc_macro = True, visibility = [], deps = [ @@ -9534,7 +9653,7 @@ cargo.rust_library( visibility = [], deps = [ ":rustls-0.21.8", - ":tokio-1.33.0", + ":tokio-1.36.0", ], ) @@ -9561,7 +9680,7 @@ cargo.rust_library( deps = [ ":futures-core-0.3.30", ":pin-project-lite-0.2.13", - ":tokio-1.33.0", + ":tokio-1.36.0", ], ) @@ -9589,7 +9708,7 @@ cargo.rust_library( deps = [ ":futures-util-0.3.30", ":log-0.4.20", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tungstenite-0.20.1", ], ) @@ -9623,7 +9742,7 @@ cargo.rust_library( ":futures-io-0.3.30", ":futures-sink-0.3.30", ":pin-project-lite-0.2.13", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tracing-0.1.40", ], ) @@ -9756,7 +9875,7 @@ cargo.rust_library( ":percent-encoding-2.3.0", ":pin-project-1.1.3", ":prost-0.12.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ":tower-0.4.13", ":tower-layer-0.3.2", @@ -9813,7 +9932,7 @@ cargo.rust_library( ":percent-encoding-2.3.0", ":pin-project-1.1.3", ":prost-0.11.9", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ":tower-0.4.13", ":tower-layer-0.3.2", @@ -9886,7 +10005,7 @@ cargo.rust_library( deps = [ ":async-stream-0.3.5", ":prost-0.12.3", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-stream-0.1.14", ":tonic-0.10.2", ], @@ -9938,7 +10057,7 @@ cargo.rust_library( ":pin-project-lite-0.2.13", ":rand-0.8.5", ":slab-0.4.9", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tokio-util-0.7.10", ":tower-layer-0.3.2", ":tower-service-0.3.2", @@ -10336,7 +10455,7 @@ cargo.rust_library( ":smallvec-1.13.1", ":thiserror-1.0.56", ":tinyvec-1.6.0", - ":tokio-1.33.0", + ":tokio-1.36.0", ":url-2.4.1", ], ) @@ -10393,7 +10512,7 @@ cargo.rust_library( ":resolv-conf-0.7.0", ":smallvec-1.13.1", ":thiserror-1.0.56", - ":tokio-1.33.0", + ":tokio-1.36.0", ":trust-dns-proto-0.21.2", ], ) @@ -11495,7 +11614,7 @@ cargo.rust_library( ":serde-1.0.196", ":serde_json-1.0.111", ":time-0.3.30", - ":tokio-1.33.0", + ":tokio-1.36.0", ":tower-service-0.3.2", ":url-2.4.1", ], diff --git a/third-party/rust/Cargo.lock b/third-party/rust/Cargo.lock index 4cd522e48d..4dd87dcd49 100644 --- a/third-party/rust/Cargo.lock +++ b/third-party/rust/Cargo.lock @@ -124,6 +124,12 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "anymap2" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" + [[package]] name = "arc-swap" version = "1.6.0" @@ -474,22 +480,24 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.32" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.0", ] [[package]] name = "clap" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f" dependencies = [ "clap_builder", "clap_derive", @@ -497,21 +505,21 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99" dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.0", ] [[package]] name = "clap_derive" -version = "4.4.7" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ "heck", "proc-macro2", @@ -521,9 +529,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "colorchoice" @@ -678,7 +686,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 1.0.109", ] @@ -692,7 +700,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 1.0.109", ] @@ -706,7 +714,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 2.0.48", ] @@ -783,18 +791,18 @@ dependencies = [ [[package]] name = "derive_builder" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +checksum = "660047478bc508c0fde22c868991eec0c40a63e48d610befef466d48e2bee574" dependencies = [ "derive_builder_macro", ] [[package]] name = "derive_builder_core" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +checksum = "9b217e6dd1011a54d12f3b920a411b5abd44b1716ecfe94f5f2f2f7b52e08ab7" dependencies = [ "darling 0.14.4", "proc-macro2", @@ -804,9 +812,9 @@ dependencies = [ [[package]] name = "derive_builder_macro" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +checksum = "7a5f77d7e20ac9153428f7ca14a88aba652adfc7a0ef0a06d654386310ef663b" dependencies = [ "derive_builder_core", "syn 1.0.109", @@ -837,6 +845,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.7" @@ -1721,7 +1735,7 @@ dependencies = [ "sha2", "socket2 0.4.10", "stringprep", - "strsim", + "strsim 0.10.0", "take_mut", "thiserror", "tokio", @@ -2378,9 +2392,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.5", "bytes", @@ -2404,6 +2418,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -2828,9 +2843,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.5.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58c3a1b3e418f61c25b2aeb43fc6c95eaa252b8cecdda67f401943e9e08d33f" +checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" dependencies = [ "base64 0.21.5", "chrono", @@ -2838,8 +2853,9 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.2.1", "serde", + "serde_derive", "serde_json", - "serde_with_macros 3.5.0", + "serde_with_macros 3.6.1", "time", ] @@ -2869,9 +2885,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.5.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2068b437a31fc68f25dd7edc296b078f04b45145c199d8eed9866e45f1ff274" +checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -3254,6 +3270,35 @@ dependencies = [ "uuid", ] +[[package]] +name = "sqlxmq" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e914521071581f0413516de0e7931086ebe4a22d719c1100de29eb339c712311" +dependencies = [ + "anymap2", + "chrono", + "dotenv", + "log", + "serde", + "serde_json", + "sqlx", + "sqlxmq_macros", + "tokio", + "uuid", +] + +[[package]] +name = "sqlxmq_macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd00667edb120f18e14e2b4a71cddb308dc3605171ed7afda3056129d08b9f4f" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3283,6 +3328,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" + [[package]] name = "strum" version = "0.25.0" @@ -3411,9 +3462,10 @@ dependencies = [ "rust-i18n", "serde", "serde_json", - "serde_with 3.5.0", + "serde_with 3.6.1", "serde_yaml 0.9.31", "sqlx", + "sqlxmq", "thiserror", "tokio", "tonic 0.10.2", @@ -3504,9 +3556,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -3533,9 +3585,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", diff --git a/third-party/rust/Cargo.toml b/third-party/rust/Cargo.toml index 185fb303a1..5a2d05b957 100644 --- a/third-party/rust/Cargo.toml +++ b/third-party/rust/Cargo.toml @@ -24,21 +24,22 @@ async-graphql = { version = "6.0.11", default-features = false, features = ["tra async-graphql-axum = "6.0.11" axum = { version = "0.6.20", features = ["headers", "macros"] } jsonwebtoken = "9.2.0" -clap = { version = "4.4.18", features = ["derive", "env"] } -derive_builder = "0.12.0" -tokio = { version = "1.33.0", features = ["full"] } +clap = { version = "4.5", features = ["derive", "env"] } +derive_builder = "0.13.0" +tokio = { version = "1.36", features = ["full"] } serde = { version = "1.0.196", features = ["derive"] } -reqwest = { version = "0.11.23", default-features = false, features = ["json", "rustls-tls"] } +reqwest = { version = "0.11.24", default-features = false, features = ["json", "rustls-tls"] } thiserror = "1.0.56" serde_yaml = "0.9.31" serde_json = "1.0.111" -chrono = { version = "0.4.32", features = ["clock", "serde"], default-features = false } +chrono = { version = "0.4.33", features = ["clock", "serde"], default-features = false } futures = "0.3.30" sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono"] } +sqlxmq = { version = "0.5", default-features = false, features = ["runtime-tokio-rustls"] } mongodb = "2.8.0" rand = "0.8.5" uuid = { version = "1.7.0", features = ["serde", "v4"] } -serde_with = "3.5.0" +serde_with = "3.6.0" tracing = "0.1.37" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } tracing-opentelemetry = "0.20.0"