Skip to content

Commit

Permalink
rudderstack
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 15, 2024
1 parent 9627420 commit 6bbab90
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 27 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion e2e_test/webhook/check_1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ github sha256
query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1

query TT
select data ->> 'source', data->> 'auth_algo' from rudderstack;
----
rudderstack plain
8 changes: 7 additions & 1 deletion e2e_test/webhook/check_2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ query TT
select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
github sha1

query TT
select data ->> 'source', data->> 'auth_algo' from rudderstack;
----
rudderstack plain
rudderstack plain
9 changes: 8 additions & 1 deletion e2e_test/webhook/check_3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,11 @@ select data ->> 'source', data->> 'auth_algo' from github_sha1;
----
github sha1
github sha1
github sha1
github sha1

query TT
select data ->> 'source', data->> 'auth_algo' from rudderstack;
----
rudderstack plain
rudderstack plain
rudderstack plain
9 changes: 9 additions & 0 deletions e2e_test/webhook/create_table.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
statement ok
create table rudderstack (
data JSONB
) WITH (
connector = 'webhook',
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'Authorization',
test_secret
);

statement ok
create table github_sha1 (
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/webhook/drop_table.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ DROP TABLE github_sha256;
statement ok
DROP TABLE github_sha1;

statement ok
DROP TABLE rudderstack;
17 changes: 17 additions & 0 deletions e2e_test/webhook/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ def send_github_sha256(secret):
send_webhook(url, headers, payload_json)


def send_rudderstack(secret):
payload = message
payload['source'] = "rudderstack"
payload['auth_algo'] = "plain"
url = SERVER_URL + "rudderstack"

payload_json = json.dumps(payload)
signature = secret
# Webhook message headers
headers = {
"Content-Type": "application/json",
"Authorization": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simulate sending Webhook messages")
parser.add_argument("--secret", required=True, help="Secret key for generating signature")
Expand All @@ -86,3 +102,4 @@ def send_github_sha256(secret):
# send data
send_github_sha1(secret)
send_github_sha256(secret)
send_rudderstack(secret)
14 changes: 6 additions & 8 deletions src/expr/impl/src/scalar/hmac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use risingwave_expr::function;
use sha1::Sha1;
use sha2::Sha256;

#[function("hmac(varchar, bytea, varchar) -> bytea")]
pub fn hmac(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> {
#[function("hmac(bytea, bytea, varchar) -> bytea")]
pub fn hmac(secret: &[u8], payload: &[u8], sha_type: &str) -> Box<[u8]> {
if sha_type == "sha1" {
sha1_hmac(secret, payload)
} else if sha_type == "sha256" {
Expand All @@ -29,9 +29,8 @@ pub fn hmac(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> {
}
}

fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
fn sha256_hmac(secret: &[u8], payload: &[u8]) -> Box<[u8]> {
let mut mac = Hmac::<Sha256>::new_from_slice(secret).expect("HMAC can take key of any size");

mac.update(payload);

Expand All @@ -41,9 +40,8 @@ fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
computed_signature.as_bytes().into()
}

fn sha1_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha1>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
fn sha1_hmac(secret: &[u8], payload: &[u8]) -> Box<[u8]> {
let mut mac = Hmac::<Sha1>::new_from_slice(secret).expect("HMAC can take key of any size");

mac.update(payload);

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Binder {
}
} else if let Some(ctx) = self.secure_compare_context.as_ref() {
if ident.real_value() == ctx.secret_name {
Ok(InputRef::new(0, DataType::Varchar).into())
Ok(InputRef::new(0, DataType::Bytea).into())
} else if ident.real_value() == ctx.column_name {
Ok(InputRef::new(1, DataType::Bytea).into())
} else {
Expand Down
19 changes: 12 additions & 7 deletions src/frontend/src/webhook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub(super) mod handlers {
Path((user, database, schema, table)): Path<(String, String, String, String)>,
body: Bytes,
) -> Result<()> {
println!(
"WKXLOG receive something: {:?}, \ndatabase: {}, \ntable: {}, \nheaders: {:?}",
body, database, table, headers
);
let session_mgr = SESSION_MANAGER
.get()
.expect("session manager has been initialized");
Expand Down Expand Up @@ -113,6 +117,8 @@ pub(super) mod handlers {
.fill_secret(secret_ref.unwrap())
.map_err(|e| err(e, StatusCode::NOT_FOUND))?;

println!("WKXLOG header_key: {:?}", header_key);

let signature = headers
.get(header_key)
.ok_or_else(|| {
Expand All @@ -122,14 +128,18 @@ pub(super) mod handlers {
)
})?
.as_bytes();
println!("WKXLOG signature: {:?}", signature);
println!("WKXLOG secret_string: {:?}", secret_string);

let is_valid = verify_signature(
secret_string.as_str(),
secret_string.as_bytes(),
body.as_ref(),
signature_expr.unwrap(),
signature,
)
.await?;
println!("WKXLOG is_valid: {:?}", is_valid);

if !is_valid {
return Err(err(
anyhow!("Signature verification failed"),
Expand Down Expand Up @@ -162,12 +172,7 @@ pub(super) mod handlers {

let _rsp = handle(session, insert_stmt, Arc::from(""), vec![])
.await
.map_err(|e| {
err(
anyhow!(e).context("Failed to insert to table"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})?;
.map_err(|e| anyhow!("failed to insert: {:?}", e))?;

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/webhook/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ impl IntoResponse for WebhookError {
}

pub async fn verify_signature(
secret: &str,
secret: &[u8],
payload: &[u8],
signature_expr: ExprNode,
signature: &[u8],
) -> Result<bool> {
let row = OwnedRow::new(vec![Some(secret.into()), Some(payload.into())]);
println!("WKXLOG signature_expr: {:?}", signature_expr);

let signature_expr_impl = ExprImpl::from_expr_proto(&signature_expr)
.map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))?;
println!("WKXLOG signature_expr_impl: {:?}", signature_expr_impl);

let result = signature_expr_impl
.eval_row(&row)
Expand Down

0 comments on commit 6bbab90

Please sign in to comment.