From 5ac5bc93a5de8a48fd8f1965401b2831d3da8920 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 6 Sep 2024 16:44:16 -0400 Subject: [PATCH] feat(source): support webhook source table --- Cargo.lock | 4 + ci/scripts/e2e-source-test.sh | 21 +- e2e_test/webhook/check_1.slt.part | 9 + e2e_test/webhook/check_2.slt.part | 11 + e2e_test/webhook/check_3.slt.part | 13 ++ e2e_test/webhook/create_table.slt.part | 21 ++ e2e_test/webhook/drop_table.slt.part | 7 + e2e_test/webhook/sender.py | 88 +++++++ e2e_test/webhook/webhook_source.slt | 27 +++ e2e_test/webhook/webhook_source_recovery.slt | 19 ++ proto/catalog.proto | 9 + proto/expr.proto | 1 + src/common/secret/src/secret_manager.rs | 22 ++ src/connector/src/source/mod.rs | 2 + src/expr/impl/Cargo.toml | 1 + src/expr/impl/src/scalar/hmac.rs | 78 +++++++ src/expr/impl/src/scalar/mod.rs | 1 + src/frontend/Cargo.toml | 7 + src/frontend/planner_test/src/lib.rs | 2 + .../binder/expr/function/builtin_scalar.rs | 1 + src/frontend/src/binder/expr/mod.rs | 11 + src/frontend/src/binder/mod.rs | 19 ++ src/frontend/src/catalog/table_catalog.rs | 8 +- src/frontend/src/expr/pure.rs | 1 + src/frontend/src/handler/create_source.rs | 5 +- src/frontend/src/handler/create_table.rs | 77 +++++- src/frontend/src/handler/create_table_as.rs | 1 + src/frontend/src/handler/explain.rs | 2 + src/frontend/src/handler/mod.rs | 2 + src/frontend/src/lib.rs | 6 + src/frontend/src/optimizer/mod.rs | 4 +- .../src/optimizer/plan_expr_visitor/strong.rs | 1 + .../optimizer/plan_node/stream_materialize.rs | 6 + src/frontend/src/optimizer/plan_node/utils.rs | 1 + .../src/scheduler/distributed/query.rs | 1 + src/frontend/src/webhook/mod.rs | 221 ++++++++++++++++++ src/frontend/src/webhook/utils.rs | 77 ++++++ src/meta/model/migration/src/lib.rs | 3 + .../src/m20241001_013810_webhook_source.rs | 35 +++ src/meta/model/src/lib.rs | 4 + src/meta/model/src/table.rs | 4 +- src/meta/src/controller/mod.rs | 1 + src/prost/build.rs | 1 + src/sqlparser/src/ast/ddl.rs | 12 +- src/sqlparser/src/ast/mod.rs | 9 +- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 65 +++++- src/storage/src/compaction_catalog_manager.rs | 1 + 48 files changed, 908 insertions(+), 15 deletions(-) create mode 100644 e2e_test/webhook/check_1.slt.part create mode 100644 e2e_test/webhook/check_2.slt.part create mode 100644 e2e_test/webhook/check_3.slt.part create mode 100644 e2e_test/webhook/create_table.slt.part create mode 100644 e2e_test/webhook/drop_table.slt.part create mode 100644 e2e_test/webhook/sender.py create mode 100644 e2e_test/webhook/webhook_source.slt create mode 100644 e2e_test/webhook/webhook_source_recovery.slt create mode 100644 src/expr/impl/src/scalar/hmac.rs create mode 100644 src/frontend/src/webhook/mod.rs create mode 100644 src/frontend/src/webhook/utils.rs create mode 100644 src/meta/model/migration/src/m20241001_013810_webhook_source.rs diff --git a/Cargo.lock b/Cargo.lock index 2e84e822497b4..76e1575d89e68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11291,6 +11291,7 @@ dependencies = [ "futures-util", "ginepro", "hex", + "hmac", "icelake", "itertools 0.13.0", "jsonbb", @@ -11343,6 +11344,7 @@ dependencies = [ "async-trait", "auto_enums", "auto_impl", + "axum", "base64 0.22.0", "bk-tree", "bytes", @@ -11411,6 +11413,8 @@ dependencies = [ "thiserror-ext", "tokio-postgres", "tokio-stream 0.1.15", + "tower 0.4.13", + "tower-http", "tracing", "uuid", "workspace-hack", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 4491db5633ea8..49426f0f54b5b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests apt-get -y install jq echo "--- e2e, inline test" @@ -154,3 +154,22 @@ risedev slt './e2e_test/source_legacy/basic/old_row_format_syntax/*.slt' echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' + +risedev ci-kill +echo "--- cluster killed " + +echo "--- starting risingwave cluster for webhook source test" +risedev ci-start ci-1cn-1fe-with-recovery +sleep 5 +# check results +risedev slt "e2e_test/webhook/webhook_source.slt" + +risedev kill + +risedev dev ci-1cn-1fe-with-recovery +echo "--- wait for cluster recovery finish" +sleep 20 +risedev slt "e2e_test/webhook/webhook_source_recovery.slt" + +risedev ci-kill +echo "--- cluster killed " \ No newline at end of file diff --git a/e2e_test/webhook/check_1.slt.part b/e2e_test/webhook/check_1.slt.part new file mode 100644 index 0000000000000..ec82b64dfc07e --- /dev/null +++ b/e2e_test/webhook/check_1.slt.part @@ -0,0 +1,9 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/check_2.slt.part b/e2e_test/webhook/check_2.slt.part new file mode 100644 index 0000000000000..bdea9c8158f3b --- /dev/null +++ b/e2e_test/webhook/check_2.slt.part @@ -0,0 +1,11 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/check_3.slt.part b/e2e_test/webhook/check_3.slt.part new file mode 100644 index 0000000000000..ef951a3920f42 --- /dev/null +++ b/e2e_test/webhook/check_3.slt.part @@ -0,0 +1,13 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 +github sha256 +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 +github sha1 +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/create_table.slt.part b/e2e_test/webhook/create_table.slt.part new file mode 100644 index 0000000000000..6f65b017350b0 --- /dev/null +++ b/e2e_test/webhook/create_table.slt.part @@ -0,0 +1,21 @@ + +statement ok +create table github_sha1 ( + data JSONB +) WITH ( + connector = 'webhook', +) VALIDATE SECRET test_secret AS secure_compare( + headers->>'X-Hub-Signature', + hmac(test_secret, data, 'sha1') +); + +statement ok +create table github_sha256 ( + data JSONB +) WITH ( + connector = 'webhook', +) VALIDATE SECRET test_secret AS secure_compare( + headers->>'X-Hub-Signature-256', + hmac(test_secret, data, 'sha256') +); + diff --git a/e2e_test/webhook/drop_table.slt.part b/e2e_test/webhook/drop_table.slt.part new file mode 100644 index 0000000000000..099e27c5d77a9 --- /dev/null +++ b/e2e_test/webhook/drop_table.slt.part @@ -0,0 +1,7 @@ + +statement ok +DROP TABLE github_sha256; + +statement ok +DROP TABLE github_sha1; + diff --git a/e2e_test/webhook/sender.py b/e2e_test/webhook/sender.py new file mode 100644 index 0000000000000..920d7b77f5d5c --- /dev/null +++ b/e2e_test/webhook/sender.py @@ -0,0 +1,88 @@ +import argparse +import requests +import json +import sys +import hmac +import hashlib + +message = { + "event": "order.created", + "source": "placeholder", + "auth_algo": "placeholder", + "data": { + "order_id": 1234, + "customer_name": "Alice", + "amount": 99.99, + "currency": "USD" + }, + "timestamp": 1639581841 +} + +SERVER_URL = "http://127.0.0.1:8080/message/root/dev/public/" + + +def generate_signature_hmac(secret, payload, auth_algo): + secret_bytes = bytes(secret, 'utf-8') + payload_bytes = bytes(payload, 'utf-8') + signature = "" + if auth_algo == "sha1": + signature = "sha1=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest() + elif auth_algo == "sha256": + signature = "sha256=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest() + else: + print("Unsupported auth type") + sys.exit(1) + return signature + + +def send_webhook(url, headers, payload_json): + response = requests.post(url, headers=headers, data=payload_json) + + # Check response status and exit on failure + if response.status_code == 200: + print("Webhook sent successfully:", response) + else: + print(f"Webhook failed to send, Status Code: {response.status_code}, Response: {response.text}") + sys.exit(1) # Exit the program with an error + + +def send_github_sha1(secret): + payload = message + payload['source'] = "github" + payload['auth_algo'] = "sha1" + url = SERVER_URL + "github_sha1" + + payload_json = json.dumps(payload) + signature = generate_signature_hmac(secret, payload_json, 'sha1') + # Webhook message headers + headers = { + "Content-Type": "application/json", + "X-Hub-Signature": signature # Custom signature header + } + send_webhook(url, headers, payload_json) + + +def send_github_sha256(secret): + payload = message + payload['source'] = "github" + payload['auth_algo'] = "sha256" + url = SERVER_URL + "github_sha256" + + payload_json = json.dumps(payload) + signature = generate_signature_hmac(secret, payload_json, 'sha256') + # Webhook message headers + headers = { + "Content-Type": "application/json", + "X-Hub-Signature-256": signature # Custom signature header + } + send_webhook(url, headers, payload_json) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simulate sending Webhook messages") + parser.add_argument("--secret", required=True, help="Secret key for generating signature") + args = parser.parse_args() + secret = args.secret + # send data + send_github_sha1(secret) + send_github_sha256(secret) diff --git a/e2e_test/webhook/webhook_source.slt b/e2e_test/webhook/webhook_source.slt new file mode 100644 index 0000000000000..5134510c233c3 --- /dev/null +++ b/e2e_test/webhook/webhook_source.slt @@ -0,0 +1,27 @@ +# Simulation test for table with webhook source + +control substitution on + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE SECRET test_secret WITH ( backend = 'meta') AS 'TEST_WEBHOOK'; + +include ./create_table.slt.part + +# insert once +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_1.slt.part + +# insert again +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_2.slt.part diff --git a/e2e_test/webhook/webhook_source_recovery.slt b/e2e_test/webhook/webhook_source_recovery.slt new file mode 100644 index 0000000000000..5755c7dfe4f6c --- /dev/null +++ b/e2e_test/webhook/webhook_source_recovery.slt @@ -0,0 +1,19 @@ +# Simulation test for table with webhook source after recovery + +control substitution on + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# insert once +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_3.slt.part + +include ./drop_table.slt.part + +statement ok +DROP SECRET test_secret diff --git a/proto/catalog.proto b/proto/catalog.proto index 5383104e9c0f2..9e203b2e94fe1 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -94,6 +94,12 @@ message StreamSourceInfo { map format_encode_secret_refs = 16; } +message WebhookSourceInfo { + secret.SecretRef secret_ref = 1; + string header_key = 2; + expr.ExprNode signature_expr = 3; +} + message Source { // For shared source, this is the same as the job id. // For non-shared source and table with connector, this is a different oid. @@ -433,6 +439,9 @@ message Table { // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. optional uint32 maybe_vnode_count = 40; + // The information used by webhook source to validate the incoming data. + optional WebhookSourceInfo webhook_info = 41; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/expr.proto b/proto/expr.proto index 5330843512849..595e5792b5ff1 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -197,6 +197,7 @@ message ExprNode { INET_NTOA = 329; QUOTE_LITERAL = 330; QUOTE_NULLABLE = 331; + HMAC = 332; // Unary operators NEG = 401; diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index b6a71a4c3ebe8..9fdb3ee9239ab 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -142,6 +142,28 @@ impl LocalSecretManager { Ok(options) } + pub fn fill_secret(&self, secret_ref: PbSecretRef) -> SecretResult { + let secret_guard = self.secrets.read(); + let secret_id = secret_ref.secret_id; + let pb_secret_bytes = secret_guard + .get(&secret_id) + .ok_or(SecretError::ItemNotFound(secret_id))?; + let secret_value_bytes = Self::get_secret_value(pb_secret_bytes)?; + match secret_ref.ref_as() { + RefAsType::Text => { + // We converted the secret string from sql to bytes using `as_bytes` in frontend. + // So use `from_utf8` here to convert it back to string. + Ok(String::from_utf8(secret_value_bytes.clone())?) + } + RefAsType::File => { + let path_str = + self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?; + Ok(path_str) + } + RefAsType::Unspecified => Err(SecretError::UnspecifiedRefType(secret_id)), + } + } + /// Get the secret file for the given secret id and return the path string. If the file does not exist, create it. /// WARNING: This method should be called only when the secret manager is locked. fn get_or_init_secret_file( diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 899fc2a2379f5..d9f1d039e7ed7 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -55,6 +55,8 @@ pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; +pub const WEBHOOK_CONNECTOR: &str = "webhook"; + pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool { const PREFIXES: &[&str] = &[ "schema.registry", diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 257cf19b77e7d..cfd0ed70155ac 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -43,6 +43,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" ginepro = "0.8" hex = "0.4" +hmac = "0.12" icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } diff --git a/src/expr/impl/src/scalar/hmac.rs b/src/expr/impl/src/scalar/hmac.rs new file mode 100644 index 0000000000000..1a47f11206de8 --- /dev/null +++ b/src/expr/impl/src/scalar/hmac.rs @@ -0,0 +1,78 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use hex::encode; +use hmac::{Hmac, Mac}; +use risingwave_expr::function; +use sha1::Sha1; +use sha2::Sha256; + +#[function("hmac(varchar, bytea, varchar) -> bytea")] +pub fn hmac(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> { + if sha_type == "sha1" { + sha1_hmac(secret, payload) + } else if sha_type == "sha256" { + sha256_hmac(secret, payload) + } else { + panic!("Unsupported SHA type: {}", sha_type) + } +} + +fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> { + let mut mac = + Hmac::::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + + mac.update(payload); + + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + let computed_signature = format!("sha256={}", encode(code_bytes)); + computed_signature.as_bytes().into() +} + +fn sha1_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> { + let mut mac = + Hmac::::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + + mac.update(payload); + + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + let computed_signature = format!("sha1={}", encode(code_bytes)); + computed_signature.as_bytes().into() +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn test_verify_signature_hmac_sha256() -> anyhow::Result<()> { + let secret = "your_secret_key"; + let payload = b"your_webhook_payload"; + let signature = b"sha256=cef8b98a91902c492b85d97f049aa4bfc5e7e3f9b8b7bf7cb49c5f829d2dac85"; // 替换为 + assert!(*sha256_hmac(secret, payload) == *signature); + Ok(()) + } + + #[tokio::test] + async fn test_verify_signature_hmac_sha1() -> anyhow::Result<()> { + let secret = "your_secret_key"; + let payload = b"your_webhook_payload"; + let signature = b"sha1=65cb920a4b8c6ab8e2eab861a096a7bc2c05d8ba"; // 替换为 + assert!(*sha1_hmac(secret, payload) == *signature); + Ok(()) + } +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index fbf9b512ea86d..ea5ce353fac48 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -47,6 +47,7 @@ mod extract; mod field; mod format; mod format_type; +mod hmac; mod in_; mod int256; mod jsonb_access; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 502eaeba2f685..c4cec5a5b9b31 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,6 +21,7 @@ async-recursion = "1.1.0" async-trait = "0.1" auto_enums = { workspace = true } auto_impl = "1" +axum = { workspace = true } base64 = "0.22" bk-tree = "0.5.0" bytes = "1" @@ -97,6 +98,12 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ tokio-postgres = "0.7" tokio-stream = { workspace = true } tonic = { workspace = true } +tower = { version = "0.4", features = ["util", "load-shed"] } +tower-http = { version = "0.6", features = [ + "add-extension", + "cors", + "compression-gzip", +] } tracing = "0.1" uuid = "1" zstd = { version = "0.13", default-features = false } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 528fa88ef3506..d854ae6622328 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -435,6 +435,7 @@ impl TestCase { cdc_table_info, include_column_options, wildcard_idx, + webhook_info, .. } => { let format_encode = format_encode.map(|schema| schema.into_v2_with_warning()); @@ -453,6 +454,7 @@ impl TestCase { with_version_column, cdc_table_info, include_column_options, + webhook_info, ) .await?; } diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index 68b37a3fee4e0..200abf27bed7b 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -294,6 +294,7 @@ impl Binder { ("sha512", raw_call(ExprType::Sha512)), ("encrypt", raw_call(ExprType::Encrypt)), ("decrypt", raw_call(ExprType::Decrypt)), + ("hmac", raw_call(ExprType::Hmac)), ("left", raw_call(ExprType::Left)), ("right", raw_call(ExprType::Right)), ("inet_aton", raw_call(ExprType::InetAton)), diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 85ed93c7dc0ca..07942a379b560 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -89,6 +89,17 @@ impl Binder { .into(), ) } + } else if let Some(ctx) = self.secure_compare_context.as_ref() { + if ident.real_value() == ctx.secret_name { + Ok(InputRef::new(0, DataType::Varchar).into()) + } else if ident.real_value() == ctx.column_name { + Ok(InputRef::new(1, DataType::Bytea).into()) + } else { + Err( + ErrorCode::ItemNotFound(format!("Unknown arg: {}", ident.real_value())) + .into(), + ) + } } else { self.bind_column(&[ident]) } diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index adb7a1b9d0f2f..e0d6952a3a54a 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -127,6 +127,15 @@ pub struct Binder { /// The temporary sources that will be used during binding phase temporary_source_manager: TemporarySourceManager, + + /// information for `secure_compare` function + secure_compare_context: Option, +} + +#[derive(Default, Clone, Debug)] +pub struct SecureCompareContext { + pub column_name: String, + pub secret_name: String, } #[derive(Clone, Debug, Default)] @@ -326,6 +335,7 @@ impl Binder { param_types: ParameterTypes::new(param_types), udf_context: UdfContext::new(), temporary_source_manager: session.temporary_source_manager(), + secure_compare_context: None, } } @@ -348,6 +358,15 @@ impl Binder { Self::new_inner(session, BindFor::Ddl, vec![]) } + pub fn new_for_ddl_with_secure_compare( + session: &SessionImpl, + ctx: SecureCompareContext, + ) -> Binder { + let mut binder = Self::new_inner(session, BindFor::Ddl, vec![]); + binder.secure_compare_context = Some(ctx); + binder + } + pub fn new_for_system(session: &SessionImpl) -> Binder { Self::new_inner(session, BindFor::System, vec![]) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index b533e6d956685..16833571878a6 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -25,7 +25,7 @@ use risingwave_common::hash::{VnodeCount, VnodeCountCompat}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; -use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable, PbWebhookSourceInfo}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -182,6 +182,8 @@ pub struct TableCatalog { /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build pub vnode_count: VnodeCount, + + pub webhook_info: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -452,6 +454,7 @@ impl TableCatalog { retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), maybe_vnode_count: self.vnode_count.to_protobuf(), + webhook_info: self.webhook_info.clone(), } } @@ -635,6 +638,7 @@ impl From for TableCatalog { .collect_vec(), cdc_table_id: tb.cdc_table_id, vnode_count, + webhook_info: tb.webhook_info, } } } @@ -726,6 +730,7 @@ mod tests { version_column_index: None, cdc_table_id: None, maybe_vnode_count: VnodeCount::set(233).to_protobuf(), + webhook_info: None, } .into(); @@ -790,6 +795,7 @@ mod tests { version_column_index: None, cdc_table_id: None, vnode_count: VnodeCount::set(233), + webhook_info: None, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 5e3bd968a46b1..ac05af99138cd 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -232,6 +232,7 @@ impl ExprVisitor for ImpureAnalyzer { | Type::Sha256 | Type::Sha384 | Type::Sha512 + | Type::Hmac | Type::Decrypt | Type::Encrypt | Type::Tand diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a59..819fa00adcbc7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -56,7 +56,7 @@ pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_connector::source::{ ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, WEBHOOK_CONNECTOR, }; use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; @@ -1137,6 +1137,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), + WEBHOOK_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], + ), )) }); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0c..7ef62c98b3427 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -36,22 +36,25 @@ use risingwave_connector::source::cdc::external::{ }; use risingwave_connector::{source, WithOptionsSecResolved}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; -use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc}; +use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, }; +use risingwave_pb::secret::secret_ref::PbRefAsType; +use risingwave_pb::secret::PbSecretRef; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Format, - FormatEncodeOptions, ObjectName, OnConflict, SourceWatermark, TableConstraint, + CdcTableInfo, ColumnDef, ColumnOption, DataType, DataType as AstDataType, ExplainOptions, + Format, FormatEncodeOptions, ObjectName, OnConflict, SecretRefAsType, SourceWatermark, + TableConstraint, WebhookSourceInfo, }; use risingwave_sqlparser::parser::IncludeOption; use thiserror_ext::AsReport; use super::RwPgResponse; -use crate::binder::{bind_data_type, bind_struct_field, Clause}; +use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableVersion; @@ -544,6 +547,7 @@ pub(crate) fn gen_create_table_plan( append_only: bool, on_conflict: Option, with_version_column: Option, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; @@ -568,6 +572,7 @@ pub(crate) fn gen_create_table_plan( on_conflict, with_version_column, Some(col_id_gen.into_version()), + webhook_info, ) } @@ -584,6 +589,7 @@ pub(crate) fn gen_create_table_plan_without_source( on_conflict: Option, with_version_column: Option, version: Option, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; @@ -626,6 +632,7 @@ pub(crate) fn gen_create_table_plan_without_source( None, database_id, schema_id, + webhook_info, ) } @@ -656,6 +663,7 @@ fn gen_table_plan_with_source( Some(cloned_source_catalog), database_id, schema_id, + None, ) } @@ -676,6 +684,7 @@ fn gen_table_plan_inner( source_catalog: Option, database_id: DatabaseId, schema_id: SchemaId, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); let retention_seconds = context.with_options().retention_seconds(); @@ -746,6 +755,7 @@ fn gen_table_plan_inner( is_external_source, retention_seconds, None, + webhook_info, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -871,6 +881,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( true, None, Some(cdc_table_id), + None, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -952,6 +963,7 @@ pub(super) async fn handle_create_table_plan( on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, + webhook_info: Option, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { let col_id_gen = ColumnIdGenerator::new_initial(); let format_encode = check_create_table_with_source( @@ -983,6 +995,10 @@ pub(super) async fn handle_create_table_plan( TableJobType::General, ), (None, None) => { + let webhook_info = webhook_info + .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info)) + .transpose()?; + let context = OptimizerContext::new(handler_args, explain_options); let (plan, table) = gen_create_table_plan( context, @@ -994,6 +1010,7 @@ pub(super) async fn handle_create_table_plan( append_only, on_conflict, with_version_column, + webhook_info, )?; ((plan, None, table), TableJobType::General) @@ -1075,6 +1092,7 @@ pub(super) async fn handle_create_table_plan( ) .into()), }; + Ok((plan, source, table, job_type)) } @@ -1243,6 +1261,7 @@ pub async fn handle_create_table( with_version_column: Option, cdc_table_info: Option, include_column_options: IncludeOption, + webhook_info: Option, ) -> Result { let session = handler_args.session.clone(); @@ -1275,6 +1294,7 @@ pub async fn handle_create_table( on_conflict, with_version_column, include_column_options, + webhook_info, ) .await?; @@ -1374,6 +1394,7 @@ pub async fn generate_stream_graph_for_replace_table( append_only, on_conflict, with_version_column, + original_catalog.webhook_info.clone(), )?; ((plan, None, table), TableJobType::General) } @@ -1486,6 +1507,54 @@ fn get_source_and_resolved_table_name( Ok((source, resolved_table_name, database_id, schema_id)) } +fn bind_webhook_info( + session: &Arc, + columns_defs: &[ColumnDef], + webhook_info: WebhookSourceInfo, +) -> Result { + if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &DataType::Jsonb { + return Err(ErrorCode::InvalidInputSyntax( + "Table with webhook source should have exactly one JSONB column".to_owned(), + ) + .into()); + } + + let WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + } = webhook_info; + + let db_name = session.database(); + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?; + let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?; + let pb_secret_ref = PbSecretRef { + secret_id: secret_catalog.id.secret_id(), + ref_as: match secret_ref.ref_as { + SecretRefAsType::Text => PbRefAsType::Text, + SecretRefAsType::File => PbRefAsType::File, + } + .into(), + }; + + // TODO(kexiang): use real column name + let secure_compare_context = SecureCompareContext { + column_name: columns_defs[0].name.real_value(), + secret_name: secret_name.clone(), + }; + let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context); + let expr = binder.bind_expr(signature_expr.clone())?; + + let pb_webhook_info = PbWebhookSourceInfo { + secret_ref: Some(pb_secret_ref), + header_key, + signature_expr: Some(expr.to_expr_proto()), + }; + + Ok(pb_webhook_info) +} + #[cfg(test)] mod tests { use risingwave_common::catalog::{Field, DEFAULT_DATABASE_NAME, ROWID_PREFIX}; diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 27c527969f9b2..5bffd99a08747 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -108,6 +108,7 @@ pub async fn handle_create_as( on_conflict, with_version_column, Some(col_id_gen.into_version()), + None, )?; let graph = build_graph(plan)?; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 1740c161c3fbe..e580d71e8e7f8 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -65,6 +65,7 @@ async fn do_handle_explain( cdc_table_info, include_column_options, wildcard_idx, + webhook_info, .. } => { let format_encode = format_encode.map(|s| s.into_v2_with_warning()); @@ -83,6 +84,7 @@ async fn do_handle_explain( on_conflict, with_version_column, include_column_options, + webhook_info, ) .await?; let context = plan.ctx(); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9cf94a37c65b0..93061dc6e3e98 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -344,6 +344,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + webhook_info, } => { if or_replace { bail_not_implemented!("CREATE OR REPLACE TABLE"); @@ -379,6 +380,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + webhook_info, ) .await } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5c006e191157e..26b84258f445b 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -69,6 +69,7 @@ pub(crate) mod error; mod meta_client; pub mod test_utils; mod user; +pub mod webhook; pub mod health_service; mod monitor; @@ -178,6 +179,7 @@ impl Default for FrontendOpts { } use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use pgwire::pg_protocol::TlsConfig; @@ -208,6 +210,10 @@ pub fn start( .collect::>(), ); + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let webhook_service = crate::webhook::WebhookService { webhook_addr: addr }; + let _task = tokio::spawn(webhook_service.serve()); + pg_serve( &listen_addr, tcp_keepalive, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e08f6b2c4dd4a..40d6fbca1d83d 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -57,7 +57,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::sink::catalog::SinkFormatDesc; -use risingwave_pb::catalog::WatermarkDesc; +use risingwave_pb::catalog::{PbWebhookSourceInfo, WatermarkDesc}; use risingwave_pb::stream_plan::StreamScanType; use self::heuristic_optimizer::ApplyOrder; @@ -639,6 +639,7 @@ impl PlanRoot { with_external_source: bool, retention_seconds: Option, cdc_table_id: Option, + webhook_info: Option, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -873,6 +874,7 @@ impl PlanRoot { version, retention_seconds, cdc_table_id, + webhook_info, ) } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 890152f00e337..0f1f75b03b63b 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -221,6 +221,7 @@ impl Strong { | ExprType::Sha256 | ExprType::Sha384 | ExprType::Sha512 + | ExprType::Hmac | ExprType::Left | ExprType::Right | ExprType::Format diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 342bfbedd1825..c8f673f11b52a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -24,6 +24,7 @@ use risingwave_common::catalog::{ use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_pb::catalog::PbWebhookSourceInfo; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; @@ -110,6 +111,7 @@ impl StreamMaterialize { cardinality, retention_seconds, create_type, + None, )?; Ok(Self::new(input, table)) @@ -135,6 +137,7 @@ impl StreamMaterialize { version: Option, retention_seconds: Option, cdc_table_id: Option, + webhook_info: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; @@ -153,6 +156,7 @@ impl StreamMaterialize { Cardinality::unknown(), // unknown cardinality for tables retention_seconds, CreateType::Foreground, + webhook_info, )?; table.cdc_table_id = cdc_table_id; @@ -227,6 +231,7 @@ impl StreamMaterialize { cardinality: Cardinality, retention_seconds: Option, create_type: CreateType, + webhook_info: Option, ) -> Result { let input = rewritten_input; @@ -285,6 +290,7 @@ impl StreamMaterialize { retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later + webhook_info, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 2433a659bad0a..327380088f29c 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -196,6 +196,7 @@ impl TableCatalogBuilder { retention_seconds: None, cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later + webhook_info: None, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 2d40328bab400..4d5d7188e4fba 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -590,6 +590,7 @@ pub(crate) mod tests { created_at_cluster_version: None, cdc_table_id: None, vnode_count: VnodeCount::set(vnode_count), + webhook_info: None, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/frontend/src/webhook/mod.rs b/src/frontend/src/webhook/mod.rs new file mode 100644 index 0000000000000..c2f42eef0492a --- /dev/null +++ b/src/frontend/src/webhook/mod.rs @@ -0,0 +1,221 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use axum::body::Bytes; +use axum::extract::{Extension, Path}; +use axum::http::{HeaderMap, Method, StatusCode}; +use axum::routing::post; +use axum::Router; +use pgwire::net::Address; +use pgwire::pg_server::SessionManager; +use risingwave_common::secret::LocalSecretManager; +use risingwave_sqlparser::ast::{Expr, ObjectName}; +use tokio::net::TcpListener; +use tower::ServiceBuilder; +use tower_http::add_extension::AddExtensionLayer; +use tower_http::compression::CompressionLayer; +use tower_http::cors::{self, CorsLayer}; + +use crate::handler::handle; +use crate::webhook::utils::{err, Result}; +mod utils; + +#[derive(Clone)] +pub struct WebhookService { + pub webhook_addr: SocketAddr, + // pub compute_clients: ComputeClientPool, +} +pub type Service = Arc; + +pub(super) mod handlers { + + use std::net::Ipv4Addr; + + use risingwave_pb::catalog::WebhookSourceInfo; + use risingwave_sqlparser::ast::{Query, SetExpr, Statement, Value, Values}; + use utils::verify_signature; + + use super::*; + use crate::catalog::root_catalog::SchemaPath; + use crate::session::SESSION_MANAGER; + + pub async fn handle_post_request( + Extension(_srv): Extension, + headers: HeaderMap, + Path((user, database, schema, table)): Path<(String, String, String, String)>, + body: Bytes, + ) -> Result<()> { + let session_mgr = SESSION_MANAGER + .get() + .expect("session manager has been initialized"); + + let dummy_addr = Address::Tcp(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + 5691, // port of meta + )); + + // TODO(kexiang): optimize this + // get a session object for the corresponding user and database + let session = session_mgr + .connect(database.as_str(), user.as_str(), Arc::new(dummy_addr)) + .map_err(|e| { + err( + anyhow!(e).context(format!( + "failed to create session for database: {}, user: {}", + database, user + )), + StatusCode::UNAUTHORIZED, + ) + })?; + + let WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + } = { + let search_path = session.config().search_path(); + let user_name = user.as_str(); + let schema_path = SchemaPath::new(Some(schema.as_str()), &search_path, user_name); + + let reader = session.env().catalog_reader().read_guard(); + let (table, _schema) = reader + .get_any_table_by_name(database.as_str(), schema_path, &table) + .map_err(|e| err(e, StatusCode::NOT_FOUND))?; + + table + .webhook_info + .as_ref() + .ok_or_else(|| { + err( + anyhow!("Table {:?} is not connected to wehbook source", table), + StatusCode::METHOD_NOT_ALLOWED, + ) + })? + .clone() + }; + + let secret_string = LocalSecretManager::global() + .fill_secret(secret_ref.unwrap()) + .map_err(|e| err(e, StatusCode::NOT_FOUND))?; + + let signature = headers + .get(header_key) + .ok_or_else(|| { + err( + anyhow!("Signature not found in the header"), + StatusCode::BAD_REQUEST, + ) + })? + .as_bytes(); + + let is_valid = verify_signature( + secret_string.as_str(), + body.as_ref(), + signature_expr.unwrap(), + signature, + ) + .await?; + if !is_valid { + return Err(err( + anyhow!("Signature verification failed"), + StatusCode::UNAUTHORIZED, + )); + } + + let payload = String::from_utf8(body.to_vec()).map_err(|e| { + err( + anyhow!(e).context("Failed to parse body"), + StatusCode::BAD_REQUEST, + ) + })?; + + let insert_stmt = Statement::Insert { + table_name: ObjectName::from(vec![table.as_str().into()]), + columns: vec![], + source: Box::new(Query { + with: None, + body: SetExpr::Values(Values(vec![vec![Expr::Value(Value::SingleQuotedString( + payload, + ))]])), + order_by: vec![], + limit: None, + offset: None, + fetch: None, + }), + returning: vec![], + }; + + let _rsp = handle(session, insert_stmt, Arc::from(""), vec![]) + .await + .map_err(|e| { + err( + anyhow!(e).context("Failed to insert to table"), + StatusCode::INTERNAL_SERVER_ERROR, + ) + })?; + + Ok(()) + } +} + +impl WebhookService { + pub async fn serve(self) -> anyhow::Result<()> { + use handlers::*; + let srv = Arc::new(self); + + let cors_layer = CorsLayer::new() + .allow_origin(cors::Any) + .allow_methods(vec![Method::POST]); + + let api_router = Router::new() + .route("/:user/:database/:schema/:table", post(handle_post_request)) + .layer( + ServiceBuilder::new() + .layer(AddExtensionLayer::new(srv.clone())) + .into_inner(), + ) + .layer(cors_layer); + + let app = Router::new() + .nest("/message", api_router) + .layer(CompressionLayer::new()); + + let listener = TcpListener::bind(&srv.webhook_addr) + .await + .context("failed to bind dashboard address")?; + axum::serve(listener, app) + .await + .context("failed to serve dashboard service")?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + #[tokio::test] + #[ignore] + async fn test_exchange_client() -> anyhow::Result<()> { + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let service = crate::webhook::WebhookService { webhook_addr: addr }; + service.serve().await?; + Ok(()) + } +} diff --git a/src/frontend/src/webhook/utils.rs b/src/frontend/src/webhook/utils.rs new file mode 100644 index 0000000000000..6f13fb58040e7 --- /dev/null +++ b/src/frontend/src/webhook/utils.rs @@ -0,0 +1,77 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::Json; +use risingwave_common::row::OwnedRow; +use risingwave_pb::expr::ExprNode; +use serde_json::json; +use thiserror_ext::AsReport; + +use crate::expr::ExprImpl; + +pub struct WebhookError { + err: anyhow::Error, + code: StatusCode, +} + +pub(crate) type Result = std::result::Result; + +pub(crate) fn err(err: impl Into, code: StatusCode) -> WebhookError { + WebhookError { + err: err.into(), + code, + } +} + +impl From for WebhookError { + fn from(value: anyhow::Error) -> Self { + WebhookError { + err: value, + code: StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +impl IntoResponse for WebhookError { + fn into_response(self) -> axum::response::Response { + let mut resp = Json(json!({ + "error": self.err.to_report_string(), + })) + .into_response(); + *resp.status_mut() = self.code; + resp + } +} + +pub async fn verify_signature( + secret: &str, + payload: &[u8], + signature_expr: ExprNode, + signature: &[u8], +) -> Result { + let row = OwnedRow::new(vec![Some(secret.into()), Some(payload.into())]); + + let signature_expr_impl = ExprImpl::from_expr_proto(&signature_expr) + .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + let result = signature_expr_impl + .eval_row(&row) + .await + .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))? + .unwrap(); + let computed_signature = result.as_bytea(); + Ok(**computed_signature == *signature) +} diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index b84a29891eee3..cdd2c95b5ae84 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -22,6 +22,8 @@ mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; +mod m20241001_013810_webhook_source; + mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; mod utils; @@ -86,6 +88,7 @@ impl MigratorTrait for Migrator { Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), Box::new(m20241025_062548_singleton_vnode_count::Migration), + Box::new(m20241001_013810_webhook_source::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241001_013810_webhook_source.rs b/src/meta/model/migration/src/m20241001_013810_webhook_source.rs new file mode 100644 index 0000000000000..b46c25366faad --- /dev/null +++ b/src/meta/model/migration/src/m20241001_013810_webhook_source.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .add_column(ColumnDef::new(Table::WebhookInfo).integer()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .drop_column(Table::WebhookInfo) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Table { + Table, + WebhookInfo, +} diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 6610484a89185..9995d8482b8fb 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -369,6 +369,10 @@ derive_array_from_blob!( PbColumnCatalogArray ); derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); +derive_from_blob!( + WebhookSourceInfo, + risingwave_pb::catalog::PbWebhookSourceInfo +); derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); derive_array_from_blob!( WatermarkDescArray, diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index b8ba38d438b7c..20fae0c926b1c 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use crate::{ Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId, - TableId, TableVersion, + TableId, TableVersion, WebhookSourceInfo, }; #[derive( @@ -135,6 +135,7 @@ pub struct Model { pub incoming_sinks: I32Array, pub cdc_table_id: Option, pub vnode_count: i32, + pub webhook_info: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -267,6 +268,7 @@ impl From for ActiveModel { incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), vnode_count, + webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c7cf45daad9e7..51c75ada8a9a2 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -166,6 +166,7 @@ impl From> for PbTable { retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), + webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()), } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index c4744e14c1b60..1dc7cd66c3456 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -116,6 +116,7 @@ fn main() -> Result<(), Box> { // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.WebhookSourceInfo", "#[derive(Eq, Hash)]") .type_attribute("secret.SecretRef", "#[derive(Eq, Hash)]") .type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index d94cf80cb9f2d..2b6eb7da83612 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -22,7 +22,8 @@ use serde::{Deserialize, Serialize}; use super::FormatEncodeOptions; use crate::ast::{ - display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue, + display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRef, + SetVariableValue, }; use crate::tokenizer::Token; @@ -802,3 +803,12 @@ impl fmt::Display for ReferentialAction { }) } } + +/// secure secret definition for webhook source +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct WebhookSourceInfo { + pub secret_ref: SecretRef, + pub header_key: String, + pub signature_expr: Expr, +} diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 563dc66be4780..8134605fa0fac 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -39,7 +39,7 @@ pub use self::data_type::{DataType, StructField}; pub use self::ddl::{ AlterColumnOperation, AlterConnectionOperation, AlterDatabaseOperation, AlterFunctionOperation, AlterSchemaOperation, AlterTableOperation, ColumnDef, ColumnOption, ColumnOptionDef, - ReferentialAction, SourceWatermark, TableConstraint, + ReferentialAction, SourceWatermark, TableConstraint, WebhookSourceInfo, }; pub use self::legacy_source::{ get_delimiter, AvroSchema, CompatibleFormatEncode, DebeziumAvroSchema, ProtobufSchema, @@ -1302,6 +1302,8 @@ pub enum Statement { cdc_table_info: Option, /// `INCLUDE a AS b INCLUDE c` include_column_options: IncludeOption, + /// `VALIDATE SECRET secure_secret_name AS secure_compare ()` + webhook_info: Option, }, /// CREATE INDEX CreateIndex { @@ -1835,6 +1837,7 @@ impl fmt::Display for Statement { query, cdc_table_info, include_column_options, + webhook_info, } => { // We want to allow the following options // Empty column list, allowed by PostgreSQL: @@ -1884,6 +1887,10 @@ impl fmt::Display for Statement { write!(f, " FROM {}", info.source_name)?; write!(f, " TABLE '{}'", info.external_table_name)?; } + if let Some(info)= webhook_info { + write!(f, " VALIDATE SECRET {}", info.secret_ref.secret_name)?; + write!(f, " AS secure_compare ()")?; + } Ok(()) } Statement::CreateIndex { diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 93e47d7a6b11a..53038fdcd08b6 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -554,6 +554,7 @@ define_keywords!( USER, USING, UUID, + VALIDATE, VALUE, VALUES, VALUE_OF, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d5582f31a64de..2125ea3763612 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -22,6 +22,7 @@ use alloc::{ }; use core::fmt; +use ddl::WebhookSourceInfo; use itertools::Itertools; use tracing::{debug, instrument}; use winnow::combinator::{alt, cut_err, dispatch, fail, opt, peek, preceded, repeat, separated}; @@ -2578,14 +2579,23 @@ impl Parser<'_> { let include_options = self.parse_include_options()?; // PostgreSQL supports `WITH ( options )`, before `AS` - let with_options = self.parse_with_properties()?; + let mut with_options = self.parse_with_properties()?; let option = with_options .iter() .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY); let connector = option.map(|opt| opt.value.to_string()); - - let format_encode = if let Some(connector) = connector { + let contain_webhook = if let Some(connector) = &connector + && connector.contains("webhook") + { + with_options.clear(); + true + } else { + false + }; + let format_encode = if let Some(connector) = connector + && !contain_webhook + { Some(self.parse_format_encode_with_connector(&connector, false)?) } else { None // Table is NOT created with an external connector. @@ -2612,6 +2622,53 @@ impl Parser<'_> { None }; + let webhook_info = if contain_webhook && self.parse_keyword(Keyword::VALIDATE) { + self.expect_keyword(Keyword::SECRET)?; + let secret_ref = SecretRef { + secret_name: self.parse_object_name()?, + ref_as: SecretRefAsType::Text, + }; + if self.parse_keywords(&[Keyword::AS, Keyword::FILE]) { + parser_err!("Secret for SECURE_COMPARE() does not support AS FILE"); + }; + self.expect_keyword(Keyword::AS)?; + let function_name = self.parse_identifier()?; + if function_name.real_value().to_uppercase() != *"SECURE_COMPARE" { + parser_err!( + "SECURE_COMPARE() is the only function supported for secret validation" + ); + } + self.expect_token(&Token::LParen)?; + let headers = self.parse_identifier()?; + if headers.real_value().to_uppercase() != *"HEADERS" { + parser_err!("The first argument of SECURE_COMPARE() should be like `HEADERS ->> {{header_key}}`"); + } + self.expect_token(&Token::LongArrow)?; + let header_key = self.parse_literal_string()?; + self.expect_token(&Token::Comma)?; + let checkpoint = *self; + let signature_expr = if let Ok(ident) = self.parse_identifier() + && !matches!(self.peek_token().token, Token::LParen) + { + // secret name + Expr::Identifier(ident) + } else { + // function to generate signature, e.g., HMAC(secret, payload, 'sha256') + *self = checkpoint; + self.parse_function()? + }; + + self.expect_token(&Token::RParen)?; + + Some(WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + }) + } else { + None + }; + Ok(Statement::CreateTable { name: table_name, temporary, @@ -2629,6 +2686,7 @@ impl Parser<'_> { query, cdc_table_info, include_column_options: include_options, + webhook_info, }) } @@ -5081,7 +5139,6 @@ impl Parser<'_> { let source = Box::new(self.parse_query()?); let returning = self.parse_returning(Optional)?; - Ok(Statement::Insert { table_name, columns, diff --git a/src/storage/src/compaction_catalog_manager.rs b/src/storage/src/compaction_catalog_manager.rs index 3133cae023300..22ed5cf4fd0b5 100644 --- a/src/storage/src/compaction_catalog_manager.rs +++ b/src/storage/src/compaction_catalog_manager.rs @@ -568,6 +568,7 @@ mod tests { created_at_cluster_version: None, cdc_table_id: None, maybe_vnode_count: None, + webhook_info: None, } }