diff --git a/e2e_test/webhook/check_1.slt.part b/e2e_test/webhook/check_1.slt.part index c7f107346a843..81eb681fc4d52 100644 --- a/e2e_test/webhook/check_1.slt.part +++ b/e2e_test/webhook/check_1.slt.part @@ -12,3 +12,8 @@ query TT select data ->> 'source', data->> 'auth_algo' from rudderstack; ---- rudderstack plain + +query TT +select data ->> 'source', data->> 'auth_algo' from segment_encode_hmac; +---- +segment encode_hmac diff --git a/e2e_test/webhook/check_2.slt.part b/e2e_test/webhook/check_2.slt.part index c212f57abe779..d3f17a85299fe 100644 --- a/e2e_test/webhook/check_2.slt.part +++ b/e2e_test/webhook/check_2.slt.part @@ -15,3 +15,9 @@ select data ->> 'source', data->> 'auth_algo' from rudderstack; ---- rudderstack plain rudderstack plain + +query TT +select data ->> 'source', data->> 'auth_algo' from segment_encode_hmac; +---- +segment encode_hmac +segment encode_hmac \ No newline at end of file diff --git a/e2e_test/webhook/check_3.slt.part b/e2e_test/webhook/check_3.slt.part index c37d75c46882c..2388c5b4763fa 100644 --- a/e2e_test/webhook/check_3.slt.part +++ b/e2e_test/webhook/check_3.slt.part @@ -17,4 +17,11 @@ select data ->> 'source', data->> 'auth_algo' from rudderstack; ---- rudderstack plain rudderstack plain -rudderstack plain \ No newline at end of file +rudderstack plain + +query TT +select data ->> 'source', data->> 'auth_algo' from segment_encode_hmac; +---- +segment encode_hmac +segment encode_hmac +segment encode_hmac \ No newline at end of file diff --git a/e2e_test/webhook/create_table.slt.part b/e2e_test/webhook/create_table.slt.part index 0860e60969c6d..3475a5ff74d57 100644 --- a/e2e_test/webhook/create_table.slt.part +++ b/e2e_test/webhook/create_table.slt.part @@ -4,7 +4,7 @@ create table rudderstack ( ) WITH ( connector = 'webhook', ) VALIDATE SECRET test_secret AS secure_compare( - headers->>'Authorization', + headers->>'authorization', test_secret ); @@ -14,8 +14,8 @@ create table github_sha1 ( ) WITH ( connector = 'webhook', ) VALIDATE SECRET test_secret AS secure_compare( - headers->>'X-Hub-Signature', - hmac(test_secret, data, 'sha1') + headers->>'x-hub-signature', + hmac(test_secret, data, 'sha1', 'sha1=') ); statement ok @@ -24,7 +24,16 @@ create table github_sha256 ( ) WITH ( connector = 'webhook', ) VALIDATE SECRET test_secret AS secure_compare( - headers->>'X-Hub-Signature-256', - hmac(test_secret, data, 'sha256') + headers->>'x-hub-signature-256', + hmac(test_secret, data, 'sha256', 'sha256=') ); +statement ok +create table segment_sha1 ( + data JSONB +) WITH ( + connector = 'webhook', +) VALIDATE SECRET test_secret AS secure_compare( + headers->>'x-signature', + hmac(test_secret, data, 'sha1') +); diff --git a/e2e_test/webhook/drop_table.slt.part b/e2e_test/webhook/drop_table.slt.part index 97aa4e67387c8..c5f12020bba6f 100644 --- a/e2e_test/webhook/drop_table.slt.part +++ b/e2e_test/webhook/drop_table.slt.part @@ -1,4 +1,7 @@ +statement ok +DROP TABLE segment_encode_hmac; + statement ok DROP TABLE github_sha256; @@ -6,4 +9,4 @@ statement ok DROP TABLE github_sha1; statement ok -DROP TABLE rudderstack; \ No newline at end of file +DROP TABLE rudderstack; diff --git a/e2e_test/webhook/sender.py b/e2e_test/webhook/sender.py index 32f28cfc7f213..38b881184d272 100644 --- a/e2e_test/webhook/sender.py +++ b/e2e_test/webhook/sender.py @@ -21,14 +21,14 @@ SERVER_URL = "http://127.0.0.1:8080/message/root/dev/public/" -def generate_signature_hmac(secret, payload, auth_algo): +def generate_signature_hmac(secret, payload, auth_algo, prefix): secret_bytes = bytes(secret, 'utf-8') payload_bytes = bytes(payload, 'utf-8') signature = "" if auth_algo == "sha1": - signature = "sha1=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest() + signature = prefix + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest() elif auth_algo == "sha256": - signature = "sha256=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest() + signature = prefix + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest() else: print("Unsupported auth type") sys.exit(1) @@ -53,7 +53,7 @@ def send_github_sha1(secret): url = SERVER_URL + "github_sha1" payload_json = json.dumps(payload) - signature = generate_signature_hmac(secret, payload_json, 'sha1') + signature = generate_signature_hmac(secret, payload_json, 'sha1', "sha1=") # Webhook message headers headers = { "Content-Type": "application/json", @@ -69,7 +69,7 @@ def send_github_sha256(secret): url = SERVER_URL + "github_sha256" payload_json = json.dumps(payload) - signature = generate_signature_hmac(secret, payload_json, 'sha256') + signature = generate_signature_hmac(secret, payload_json, 'sha256', "sha256=") # Webhook message headers headers = { "Content-Type": "application/json", @@ -79,6 +79,7 @@ def send_github_sha256(secret): def send_rudderstack(secret): + # apply to both rudderstack and AWS EventBridge payload = message payload['source'] = "rudderstack" payload['auth_algo'] = "plain" @@ -94,6 +95,23 @@ def send_rudderstack(secret): send_webhook(url, headers, payload_json) +def send_segment_encode_hmac(secret): + # apply to both rudderstack and AWS EventBridge + payload = message + payload['source'] = "segment" + payload['auth_algo'] = "encode_hmac" + url = SERVER_URL + "segment_encode_hmac" + + payload_json = json.dumps(payload) + signature = generate_signature_hmac(secret, payload_json, 'sha1', '') + # Webhook message headers + headers = { + "Content-Type": "application/json", + "x-signature": signature # Custom signature header + } + send_webhook(url, headers, payload_json) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Simulate sending Webhook messages") parser.add_argument("--secret", required=True, help="Secret key for generating signature") diff --git a/e2e_test/webhook/webhook_source.slt b/e2e_test/webhook/webhook_source.slt index 5134510c233c3..3467c99913ec6 100644 --- a/e2e_test/webhook/webhook_source.slt +++ b/e2e_test/webhook/webhook_source.slt @@ -16,6 +16,7 @@ python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK sleep 3s +# TODO(kexiang): will use a script to take place of check_1, check_2, check_3 include ./check_1.slt.part # insert again diff --git a/proto/catalog.proto b/proto/catalog.proto index 9e203b2e94fe1..cfc3bfaf4f207 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -96,8 +96,7 @@ message StreamSourceInfo { message WebhookSourceInfo { secret.SecretRef secret_ref = 1; - string header_key = 2; - expr.ExprNode signature_expr = 3; + expr.ExprNode signature_expr = 2; } message Source { diff --git a/proto/expr.proto b/proto/expr.proto index 595e5792b5ff1..393f7bfb89a8f 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -198,6 +198,7 @@ message ExprNode { QUOTE_LITERAL = 330; QUOTE_NULLABLE = 331; HMAC = 332; + SECURE_COMPARE = 333; // Unary operators NEG = 401; diff --git a/src/expr/impl/src/scalar/hmac.rs b/src/expr/impl/src/scalar/hmac.rs index b532dfeb251aa..ef79eb9db6381 100644 --- a/src/expr/impl/src/scalar/hmac.rs +++ b/src/expr/impl/src/scalar/hmac.rs @@ -20,35 +20,53 @@ use sha2::Sha256; #[function("hmac(bytea, bytea, varchar) -> bytea")] pub fn hmac(secret: &[u8], payload: &[u8], sha_type: &str) -> Box<[u8]> { + hmac_with_prefix(secret, payload, sha_type, "") +} + +#[function("hmac(bytea, bytea, varchar, varchar) -> bytea")] +pub fn hmac_with_prefix(secret: &[u8], payload: &[u8], sha_type: &str, prefix: &str) -> Box<[u8]> { if sha_type == "sha1" { - sha1_hmac(secret, payload) + sha1_hmac(secret, payload, prefix) } else if sha_type == "sha256" { - sha256_hmac(secret, payload) + sha256_hmac(secret, payload, prefix) } else { panic!("Unsupported SHA type: {}", sha_type) } } -fn sha256_hmac(secret: &[u8], payload: &[u8]) -> Box<[u8]> { +fn sha256_hmac(secret: &[u8], payload: &[u8], prefix: &str) -> Box<[u8]> { let mut mac = Hmac::::new_from_slice(secret).expect("HMAC can take key of any size"); mac.update(payload); let result = mac.finalize(); let code_bytes = result.into_bytes(); - let computed_signature = format!("sha256={}", encode(code_bytes)); - computed_signature.as_bytes().into() + if prefix.is_empty() { + code_bytes.as_slice().into() + } else { + let computed_signature = format!("{}{}", prefix, encode(code_bytes)); + computed_signature.as_bytes().into() + } } -fn sha1_hmac(secret: &[u8], payload: &[u8]) -> Box<[u8]> { +fn sha1_hmac(secret: &[u8], payload: &[u8], prefix: &str) -> Box<[u8]> { let mut mac = Hmac::::new_from_slice(secret).expect("HMAC can take key of any size"); mac.update(payload); let result = mac.finalize(); let code_bytes = result.into_bytes(); - let computed_signature = format!("sha1={}", encode(code_bytes)); - computed_signature.as_bytes().into() + if prefix.is_empty() { + code_bytes.as_slice().into() + } else { + let computed_signature = format!("{}{}", prefix, encode(code_bytes)); + computed_signature.as_bytes().into() + } +} + +#[function("secure_compare(bytea, bytea) -> boolean")] +pub fn secure_compare(left: &[u8], right: &[u8]) -> bool { + left == right } #[cfg(test)] diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index 200abf27bed7b..02bd9697b62e6 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -295,6 +295,16 @@ impl Binder { ("encrypt", raw_call(ExprType::Encrypt)), ("decrypt", raw_call(ExprType::Decrypt)), ("hmac", raw_call(ExprType::Hmac)), + ("secure_compare",guard_by_len(2, raw(|_binder, mut inputs| { + // Similar to `cast` from string, return type is set explicitly rather than inferred. + if !inputs[0].is_untyped() && inputs[0].return_type() == DataType::Varchar { + FunctionCall::cast_mut(&mut inputs[0], DataType::Bytea, CastContext::Explicit)?; + }; + if !inputs[1].is_untyped() && inputs[1].return_type() == DataType::Varchar { + FunctionCall::cast_mut(&mut inputs[1], DataType::Bytea, CastContext::Explicit)?; + }; + Ok(FunctionCall::new_unchecked(ExprType::SecureCompare , inputs, DataType::Boolean).into()) + }))), ("left", raw_call(ExprType::Left)), ("right", raw_call(ExprType::Right)), ("inet_aton", raw_call(ExprType::InetAton)), diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index be933142acfdd..65d91ef7f7a46 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -90,10 +90,12 @@ impl Binder { ) } } else if let Some(ctx) = self.secure_compare_context.as_ref() { - if ident.real_value() == ctx.secret_name { - Ok(InputRef::new(0, DataType::Bytea).into()) - } else if ident.real_value() == ctx.column_name { + if ident.real_value() == "headers".to_string() { + Ok(InputRef::new(0, DataType::Jsonb).into()) + } else if ident.real_value() == ctx.secret_name { Ok(InputRef::new(1, DataType::Bytea).into()) + } else if ident.real_value() == ctx.column_name { + Ok(InputRef::new(2, DataType::Bytea).into()) } else { Err( ErrorCode::ItemNotFound(format!("Unknown arg: {}", ident.real_value())) diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index ac05af99138cd..1a51029d576e6 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -233,6 +233,7 @@ impl ExprVisitor for ImpureAnalyzer { | Type::Sha384 | Type::Sha512 | Type::Hmac + | Type::SecureCompare | Type::Decrypt | Type::Encrypt | Type::Tand diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d1a967f7545a5..ffb7b661dfd54 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1482,7 +1482,6 @@ fn bind_webhook_info( let WebhookSourceInfo { secret_ref, - header_key, signature_expr, } = webhook_info; @@ -1509,7 +1508,6 @@ fn bind_webhook_info( let pb_webhook_info = PbWebhookSourceInfo { secret_ref: Some(pb_secret_ref), - header_key, signature_expr: Some(expr.to_expr_proto()), }; diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 0f1f75b03b63b..708607107874a 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -222,6 +222,7 @@ impl Strong { | ExprType::Sha384 | ExprType::Sha512 | ExprType::Hmac + | ExprType::SecureCompare | ExprType::Left | ExprType::Right | ExprType::Format diff --git a/src/frontend/src/webhook/mod.rs b/src/frontend/src/webhook/mod.rs index 973a17e4b685b..ce3746a7891a3 100644 --- a/src/frontend/src/webhook/mod.rs +++ b/src/frontend/src/webhook/mod.rs @@ -38,17 +38,15 @@ mod utils; #[derive(Clone)] pub struct WebhookService { pub webhook_addr: SocketAddr, - // pub compute_clients: ComputeClientPool, } pub type Service = Arc; pub(super) mod handlers { - use std::net::Ipv4Addr; use risingwave_pb::catalog::WebhookSourceInfo; use risingwave_sqlparser::ast::{Query, SetExpr, Statement, Value, Values}; - use utils::verify_signature; + use utils::{header_map_to_json, verify_signature}; use super::*; use crate::catalog::root_catalog::SchemaPath; @@ -60,10 +58,6 @@ pub(super) mod handlers { Path((user, database, schema, table)): Path<(String, String, String, String)>, body: Bytes, ) -> Result<()> { - println!( - "WKXLOG receive something: {:?}, \ndatabase: {}, \ntable: {}, \nheaders: {:?}", - body, database, table, headers - ); let session_mgr = SESSION_MANAGER .get() .expect("session manager has been initialized"); @@ -80,7 +74,7 @@ pub(super) mod handlers { .map_err(|e| { err( anyhow!(e).context(format!( - "failed to create session for database: {}, user: {}", + "Failed to create session for database: {}, user: {}", database, user )), StatusCode::UNAUTHORIZED, @@ -89,7 +83,6 @@ pub(super) mod handlers { let WebhookSourceInfo { secret_ref, - header_key, signature_expr, } = { let search_path = session.config().search_path(); @@ -117,28 +110,17 @@ pub(super) mod handlers { .fill_secret(secret_ref.unwrap()) .map_err(|e| err(e, StatusCode::NOT_FOUND))?; - println!("WKXLOG header_key: {:?}", header_key); - - let signature = headers - .get(header_key) - .ok_or_else(|| { - err( - anyhow!("Signature not found in the header"), - StatusCode::BAD_REQUEST, - ) - })? - .as_bytes(); - println!("WKXLOG signature: {:?}", signature); - println!("WKXLOG secret_string: {:?}", secret_string); + // Once limitation here is that the key is no longer case-insensitive, users must user the lowercase key when defining the webhook source table. + let headers_jsonb = header_map_to_json(&headers); let is_valid = verify_signature( + headers_jsonb, secret_string.as_bytes(), body.as_ref(), signature_expr.unwrap(), - signature, + // signature, ) .await?; - println!("WKXLOG is_valid: {:?}", is_valid); if !is_valid { return Err(err( @@ -202,10 +184,10 @@ impl WebhookService { let listener = TcpListener::bind(&srv.webhook_addr) .await - .context("failed to bind dashboard address")?; + .context("Failed to bind dashboard address")?; axum::serve(listener, app) .await - .context("failed to serve dashboard service")?; + .context("Failed to serve dashboard service")?; Ok(()) } diff --git a/src/frontend/src/webhook/utils.rs b/src/frontend/src/webhook/utils.rs index 3e87dd6cfd265..a2c71bac921b6 100644 --- a/src/frontend/src/webhook/utils.rs +++ b/src/frontend/src/webhook/utils.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::http::StatusCode; +use std::collections::HashMap; + +use anyhow::anyhow; +use axum::http::{HeaderMap, StatusCode}; use axum::response::IntoResponse; use axum::Json; use risingwave_common::row::OwnedRow; +use risingwave_common::types::JsonbVal; use risingwave_pb::expr::ExprNode; use serde_json::json; use thiserror_ext::AsReport; @@ -56,24 +60,46 @@ impl IntoResponse for WebhookError { } } -pub async fn verify_signature( +pub(crate) fn header_map_to_json(headers: &HeaderMap) -> JsonbVal { + let mut header_map = HashMap::new(); + + for (key, value) in headers.iter() { + let key = key.as_str().to_string(); + let value = value.to_str().unwrap_or("").to_string(); + header_map.insert(key, value); + } + + let json_value = json!(header_map); + JsonbVal::from(json_value) +} + +pub(crate) async fn verify_signature( + headers_jsonb: JsonbVal, secret: &[u8], payload: &[u8], signature_expr: ExprNode, - signature: &[u8], + // signature: &[u8], ) -> Result { - let row = OwnedRow::new(vec![Some(secret.into()), Some(payload.into())]); - println!("WKXLOG signature_expr: {:?}", signature_expr); + let row = OwnedRow::new(vec![ + Some(headers_jsonb.into()), + Some(secret.into()), + Some(payload.into()), + ]); let signature_expr_impl = ExprImpl::from_expr_proto(&signature_expr) .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))?; - println!("WKXLOG signature_expr_impl: {:?}", signature_expr_impl); let result = signature_expr_impl .eval_row(&row) .await .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))? - .unwrap(); - let computed_signature = result.as_bytea(); - Ok(**computed_signature == *signature) + .ok_or_else(|| { + err( + anyhow!("`SECURE_COMPARE()` failed"), + StatusCode::BAD_REQUEST, + ) + })?; + // let computed_signature = result.as_bytea(); + // Ok(**computed_signature == *signature) + Ok(*result.as_bool()) } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 2b6eb7da83612..967c1111fa685 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -809,6 +809,5 @@ impl fmt::Display for ReferentialAction { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct WebhookSourceInfo { pub secret_ref: SecretRef, - pub header_key: String, pub signature_expr: Expr, } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 8134605fa0fac..a2c8a9c08f2b3 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1889,7 +1889,7 @@ impl fmt::Display for Statement { } if let Some(info)= webhook_info { write!(f, " VALIDATE SECRET {}", info.secret_ref.secret_name)?; - write!(f, " AS secure_compare ()")?; + write!(f, " AS {}", info.signature_expr)?; } Ok(()) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 47e20e24dc432..a65b28a949e95 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2588,6 +2588,7 @@ impl Parser<'_> { let contain_webhook = if let Some(connector) = &connector && connector.contains("webhook") { + // for webhook source, we clear all the options and use `webhook_info` to store the webhook info with_options.clear(); true } else { @@ -2622,7 +2623,10 @@ impl Parser<'_> { None }; - let webhook_info = if contain_webhook && self.parse_keyword(Keyword::VALIDATE) { + let webhook_info = if self.parse_keyword(Keyword::VALIDATE) { + if !contain_webhook { + parser_err!("VALIDATE is only supported for tables created with webhook source"); + } self.expect_keyword(Keyword::SECRET)?; let secret_ref = SecretRef { secret_name: self.parse_object_name()?, @@ -2632,37 +2636,10 @@ impl Parser<'_> { parser_err!("Secret for SECURE_COMPARE() does not support AS FILE"); }; self.expect_keyword(Keyword::AS)?; - let function_name = self.parse_identifier()?; - if function_name.real_value().to_uppercase() != *"SECURE_COMPARE" { - parser_err!( - "SECURE_COMPARE() is the only function supported for secret validation" - ); - } - self.expect_token(&Token::LParen)?; - let headers = self.parse_identifier()?; - if headers.real_value().to_uppercase() != *"HEADERS" { - parser_err!("The first argument of SECURE_COMPARE() should be like `HEADERS ->> {{header_key}}`"); - } - self.expect_token(&Token::LongArrow)?; - let header_key = self.parse_literal_string()?; - self.expect_token(&Token::Comma)?; - let checkpoint = *self; - let signature_expr = if let Ok(ident) = self.parse_identifier() - && !matches!(self.peek_token().token, Token::LParen) - { - // secret name - Expr::Identifier(ident) - } else { - // function to generate signature, e.g., HMAC(secret, payload, 'sha256') - *self = checkpoint; - self.parse_function()? - }; - - self.expect_token(&Token::RParen)?; + let signature_expr = self.parse_function()?; Some(WebhookSourceInfo { secret_ref, - header_key, signature_expr, }) } else {