From 1f3ab221eec604fbfdfb860a8fbd4fd70c1e0349 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 6 Nov 2024 17:03:34 -0500 Subject: [PATCH] add test --- Cargo.lock | 3 - ci/scripts/e2e-source-test.sh | 21 ++++- e2e_test/webhook/check.slt.part | 0 e2e_test/webhook/check_1.slt.part | 9 ++ e2e_test/webhook/check_2.slt.part | 11 +++ e2e_test/webhook/check_3.slt.part | 13 +++ e2e_test/webhook/drop_table.slt.part | 7 ++ e2e_test/webhook/sender.py | 88 +++++++++++++++++++ e2e_test/webhook/webhook_source.slt | 25 +++++- e2e_test/webhook/webhook_source_recovery.slt | 19 ++++ proto/catalog.proto | 3 +- proto/expr.proto | 47 ++++------ src/common/secret/src/secret_manager.rs | 8 +- src/expr/impl/src/scalar/hmac.rs | 2 +- src/frontend/Cargo.toml | 4 - src/frontend/src/binder/expr/function/mod.rs | 2 +- src/frontend/src/binder/expr/mod.rs | 24 +++-- src/frontend/src/binder/mod.rs | 2 +- src/frontend/src/handler/create_source.rs | 10 ++- src/frontend/src/optimizer/mod.rs | 2 +- src/frontend/src/webhook/mod.rs | 11 +-- src/frontend/src/webhook/utils.rs | 2 +- src/meta/model/src/lib.rs | 5 +- src/sqlparser/src/parser.rs | 4 +- src/storage/src/compaction_catalog_manager.rs | 1 + 25 files changed, 253 insertions(+), 70 deletions(-) delete mode 100644 e2e_test/webhook/check.slt.part create mode 100644 e2e_test/webhook/check_1.slt.part create mode 100644 e2e_test/webhook/check_2.slt.part create mode 100644 e2e_test/webhook/check_3.slt.part create mode 100644 e2e_test/webhook/drop_table.slt.part create mode 100644 e2e_test/webhook/webhook_source_recovery.slt diff --git a/Cargo.lock b/Cargo.lock index 60ba38463f408..76e1575d89e68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11361,8 +11361,6 @@ dependencies = [ "fixedbitset 0.5.0", "futures", "futures-async-stream", - "hex", - "hmac", "iana-time-zone", "iceberg", "icelake", @@ -11407,7 +11405,6 @@ dependencies = [ "rw_futures_util", "serde", "serde_json", - "sha1", "sha2", "smallvec", "speedate", diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 4491db5633ea8..49426f0f54b5b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,7 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" -python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema requests apt-get -y install jq echo "--- e2e, inline test" @@ -154,3 +154,22 @@ risedev slt './e2e_test/source_legacy/basic/old_row_format_syntax/*.slt' echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' + +risedev ci-kill +echo "--- cluster killed " + +echo "--- starting risingwave cluster for webhook source test" +risedev ci-start ci-1cn-1fe-with-recovery +sleep 5 +# check results +risedev slt "e2e_test/webhook/webhook_source.slt" + +risedev kill + +risedev dev ci-1cn-1fe-with-recovery +echo "--- wait for cluster recovery finish" +sleep 20 +risedev slt "e2e_test/webhook/webhook_source_recovery.slt" + +risedev ci-kill +echo "--- cluster killed " \ No newline at end of file diff --git a/e2e_test/webhook/check.slt.part b/e2e_test/webhook/check.slt.part deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/e2e_test/webhook/check_1.slt.part b/e2e_test/webhook/check_1.slt.part new file mode 100644 index 0000000000000..ec82b64dfc07e --- /dev/null +++ b/e2e_test/webhook/check_1.slt.part @@ -0,0 +1,9 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/check_2.slt.part b/e2e_test/webhook/check_2.slt.part new file mode 100644 index 0000000000000..bdea9c8158f3b --- /dev/null +++ b/e2e_test/webhook/check_2.slt.part @@ -0,0 +1,11 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/check_3.slt.part b/e2e_test/webhook/check_3.slt.part new file mode 100644 index 0000000000000..ef951a3920f42 --- /dev/null +++ b/e2e_test/webhook/check_3.slt.part @@ -0,0 +1,13 @@ +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha256; +---- +github sha256 +github sha256 +github sha256 + +query TT +select data ->> 'source', data->> 'auth_algo' from github_sha1; +---- +github sha1 +github sha1 +github sha1 \ No newline at end of file diff --git a/e2e_test/webhook/drop_table.slt.part b/e2e_test/webhook/drop_table.slt.part new file mode 100644 index 0000000000000..099e27c5d77a9 --- /dev/null +++ b/e2e_test/webhook/drop_table.slt.part @@ -0,0 +1,7 @@ + +statement ok +DROP TABLE github_sha256; + +statement ok +DROP TABLE github_sha1; + diff --git a/e2e_test/webhook/sender.py b/e2e_test/webhook/sender.py index e69de29bb2d1d..920d7b77f5d5c 100644 --- a/e2e_test/webhook/sender.py +++ b/e2e_test/webhook/sender.py @@ -0,0 +1,88 @@ +import argparse +import requests +import json +import sys +import hmac +import hashlib + +message = { + "event": "order.created", + "source": "placeholder", + "auth_algo": "placeholder", + "data": { + "order_id": 1234, + "customer_name": "Alice", + "amount": 99.99, + "currency": "USD" + }, + "timestamp": 1639581841 +} + +SERVER_URL = "http://127.0.0.1:8080/message/root/dev/public/" + + +def generate_signature_hmac(secret, payload, auth_algo): + secret_bytes = bytes(secret, 'utf-8') + payload_bytes = bytes(payload, 'utf-8') + signature = "" + if auth_algo == "sha1": + signature = "sha1=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha1).hexdigest() + elif auth_algo == "sha256": + signature = "sha256=" + hmac.new(secret_bytes, payload_bytes, digestmod=hashlib.sha256).hexdigest() + else: + print("Unsupported auth type") + sys.exit(1) + return signature + + +def send_webhook(url, headers, payload_json): + response = requests.post(url, headers=headers, data=payload_json) + + # Check response status and exit on failure + if response.status_code == 200: + print("Webhook sent successfully:", response) + else: + print(f"Webhook failed to send, Status Code: {response.status_code}, Response: {response.text}") + sys.exit(1) # Exit the program with an error + + +def send_github_sha1(secret): + payload = message + payload['source'] = "github" + payload['auth_algo'] = "sha1" + url = SERVER_URL + "github_sha1" + + payload_json = json.dumps(payload) + signature = generate_signature_hmac(secret, payload_json, 'sha1') + # Webhook message headers + headers = { + "Content-Type": "application/json", + "X-Hub-Signature": signature # Custom signature header + } + send_webhook(url, headers, payload_json) + + +def send_github_sha256(secret): + payload = message + payload['source'] = "github" + payload['auth_algo'] = "sha256" + url = SERVER_URL + "github_sha256" + + payload_json = json.dumps(payload) + signature = generate_signature_hmac(secret, payload_json, 'sha256') + # Webhook message headers + headers = { + "Content-Type": "application/json", + "X-Hub-Signature-256": signature # Custom signature header + } + send_webhook(url, headers, payload_json) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simulate sending Webhook messages") + parser.add_argument("--secret", required=True, help="Secret key for generating signature") + args = parser.parse_args() + secret = args.secret + # send data + send_github_sha1(secret) + send_github_sha256(secret) diff --git a/e2e_test/webhook/webhook_source.slt b/e2e_test/webhook/webhook_source.slt index 984c12e93bad5..5134510c233c3 100644 --- a/e2e_test/webhook/webhook_source.slt +++ b/e2e_test/webhook/webhook_source.slt @@ -1,4 +1,27 @@ +# Simulation test for table with webhook source + +control substitution on + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + statement ok CREATE SECRET test_secret WITH ( backend = 'meta') AS 'TEST_WEBHOOK'; -include ../create_table.slt.part +include ./create_table.slt.part + +# insert once +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_1.slt.part + +# insert again +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_2.slt.part diff --git a/e2e_test/webhook/webhook_source_recovery.slt b/e2e_test/webhook/webhook_source_recovery.slt new file mode 100644 index 0000000000000..5755c7dfe4f6c --- /dev/null +++ b/e2e_test/webhook/webhook_source_recovery.slt @@ -0,0 +1,19 @@ +# Simulation test for table with webhook source after recovery + +control substitution on + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# insert once +system ok +python3 e2e_test/webhook/sender.py --secret TEST_WEBHOOK + +sleep 3s + +include ./check_3.slt.part + +include ./drop_table.slt.part + +statement ok +DROP SECRET test_secret diff --git a/proto/catalog.proto b/proto/catalog.proto index 2286b20ccf64b..9e203b2e94fe1 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -439,8 +439,9 @@ message Table { // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. optional uint32 maybe_vnode_count = 40; + // The information used by webhook source to validate the incoming data. optional WebhookSourceInfo webhook_info = 41; - + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/expr.proto b/proto/expr.proto index 3eaba5f6e55e9..595e5792b5ff1 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -12,9 +12,8 @@ message ExprNode { message NowRexNode {} // TODO: move this into `FunctionCall`. enum Type { - // `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the - // viriant of `rex_node`. Their types are therefore deprecated and should be - // `UNSPECIFIED` instead. + // `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the viriant of `rex_node`. + // Their types are therefore deprecated and should be `UNSPECIFIED` instead. reserved 1, 2, 3000; reserved "INPUT_REF", "CONSTANT_VALUE", "UDF"; @@ -128,7 +127,7 @@ message ExprNode { LPAD = 238; RPAD = 239; REVERSE = 240; - STRPOS = 241 [ deprecated = true ]; // duplicated with POSITION + STRPOS = 241 [deprecated = true]; // duplicated with POSITION TO_ASCII = 242; TO_HEX = 243; QUOTE_IDENT = 244; @@ -331,8 +330,7 @@ message ExprNode { // EXTERNAL ICEBERG_TRANSFORM = 2201; } - // Only use this field for function call. For other types of expression, it - // should be UNSPECIFIED. + // Only use this field for function call. For other types of expression, it should be UNSPECIFIED. Type function_type = 1; data.DataType return_type = 3; oneof rex_node { @@ -393,8 +391,8 @@ message Constant { // The items which can occur in the select list of `ProjectSet` operator. // -// When there are table functions in the SQL query `SELECT ...`, it will be -// planned as `ProjectSet`. Otherwise it will be planned as `Project`. +// When there are table functions in the SQL query `SELECT ...`, it will be planned as `ProjectSet`. +// Otherwise it will be planned as `Project`. // // # Examples // @@ -424,7 +422,9 @@ message ProjectSetSelectItem { } } -message FunctionCall { repeated ExprNode children = 1; } +message FunctionCall { + repeated ExprNode children = 1; +} // Aggregate Function Calls for Aggregation message AggCall { @@ -461,8 +461,7 @@ message AggCall { // user defined aggregate function USER_DEFINED = 100; - // wraps a scalar function that takes a list as input as an aggregate - // function. + // wraps a scalar function that takes a list as input as an aggregate function. WRAP_SCALAR = 101; } Kind kind = 1; @@ -496,8 +495,7 @@ message WindowFrame { enum Type { TYPE_UNSPECIFIED = 0; - TYPE_ROWS_LEGACY = 2 - [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. + TYPE_ROWS_LEGACY = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. TYPE_ROWS = 5; TYPE_RANGE = 10; @@ -557,10 +555,8 @@ message WindowFrame { Type type = 1; - Bound start = 2 - [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. - Bound end = 3 - [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. + Bound start = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. + Bound end = 3 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. Exclusion exclusion = 4; @@ -594,9 +590,8 @@ message WindowFunction { WindowFrame frame = 5; } -// Note: due to historic reasons, UserDefinedFunction is a oneof variant -// parallel to FunctionCall, while UserDefinedFunctionMetadata is embedded as a -// field in TableFunction and AggCall. +// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall, +// while UserDefinedFunctionMetadata is embedded as a field in TableFunction and AggCall. message UserDefinedFunction { repeated ExprNode children = 1; @@ -607,10 +602,8 @@ message UserDefinedFunction { // The link to the external function service. optional string link = 5; // An unique identifier to the function. - // - If `link` is not empty, the name of the function in the external function - // service. - // - If `language` is `rust` or `wasm`, the name of the function in the wasm - // binary file. + // - If `link` is not empty, the name of the function in the external function service. + // - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file. // - If `language` is `javascript`, the name of the function. optional string identifier = 6; // - If `language` is `javascript`, the source code of the function. @@ -618,11 +611,9 @@ message UserDefinedFunction { // - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary. optional bytes compressed_binary = 10; bool always_retry_on_network_error = 9; - // The runtime used when javascript is used as the language. Could be - // "quickjs" or "deno". + // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". optional string runtime = 11; - // The function type, which is used to execute the function. Could be "sync", - // "async", "generator" or "async_generator" + // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" optional string function_type = 12; } diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index d8215b34454dd..9fdb3ee9239ab 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -153,16 +153,14 @@ impl LocalSecretManager { RefAsType::Text => { // We converted the secret string from sql to bytes using `as_bytes` in frontend. // So use `from_utf8` here to convert it back to string. - return Ok(String::from_utf8(secret_value_bytes.clone())?); + Ok(String::from_utf8(secret_value_bytes.clone())?) } RefAsType::File => { let path_str = self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?; - return Ok(path_str); - } - RefAsType::Unspecified => { - return Err(SecretError::UnspecifiedRefType(secret_id)); + Ok(path_str) } + RefAsType::Unspecified => Err(SecretError::UnspecifiedRefType(secret_id)), } } diff --git a/src/expr/impl/src/scalar/hmac.rs b/src/expr/impl/src/scalar/hmac.rs index 8d09d205e09d2..1a47f11206de8 100644 --- a/src/expr/impl/src/scalar/hmac.rs +++ b/src/expr/impl/src/scalar/hmac.rs @@ -19,7 +19,7 @@ use sha1::Sha1; use sha2::Sha256; #[function("hmac(varchar, bytea, varchar) -> bytea")] -pub fn hmac<'a>(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> { +pub fn hmac(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> { if sha_type == "sha1" { sha1_hmac(secret, payload) } else if sha_type == "sha256" { diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 072b590d1f9ab..c4cec5a5b9b31 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,8 +37,6 @@ fancy-regex = "0.14.0" fixedbitset = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } -hex = "0.4" -hmac = "0.12" iana-time-zone = "0.1" iceberg = { workspace = true } icelake = { workspace = true } @@ -82,7 +80,6 @@ risingwave_variables = { workspace = true } rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" -sha1 = "0.10.6" sha2 = "0.10.7" smallvec = { version = "1.13.1", features = ["serde"] } speedate = "0.15.0" @@ -111,7 +108,6 @@ tracing = "0.1" uuid = "1" zstd = { version = "0.13", default-features = false } - [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index dfe2b5d2c99e4..f7a4007ffd467 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -146,7 +146,7 @@ impl Binder { ); return self.bind_array_transform(arg_list.args); } - + let mut args: Vec<_> = arg_list .args .iter() diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 2a0641c8f1fe6..07942a379b560 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -89,21 +89,19 @@ impl Binder { .into(), ) } - } else { - if let Some(ctx) = self.secure_compare_context.as_ref() { - if ident.real_value() == ctx.secret_name { - Ok(InputRef::new(0, DataType::Varchar).into()) - } else if ident.real_value() == ctx.column_name { - Ok(InputRef::new(1, DataType::Bytea).into()) - } else { - Err( - ErrorCode::ItemNotFound(format!("Unknown arg: {}", ident.real_value())) - .into(), - ) - } + } else if let Some(ctx) = self.secure_compare_context.as_ref() { + if ident.real_value() == ctx.secret_name { + Ok(InputRef::new(0, DataType::Varchar).into()) + } else if ident.real_value() == ctx.column_name { + Ok(InputRef::new(1, DataType::Bytea).into()) } else { - self.bind_column(&[ident]) + Err( + ErrorCode::ItemNotFound(format!("Unknown arg: {}", ident.real_value())) + .into(), + ) } + } else { + self.bind_column(&[ident]) } } Expr::CompoundIdentifier(idents) => self.bind_column(&idents), diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 8ebb464b887a0..e0d6952a3a54a 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -128,7 +128,7 @@ pub struct Binder { /// The temporary sources that will be used during binding phase temporary_source_manager: TemporarySourceManager, - /// information about secure_compare + /// information for `secure_compare` function secure_compare_context: Option, } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c5f5017beb1e9..819fa00adcbc7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,6 +21,7 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; +use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ @@ -1468,6 +1469,14 @@ pub fn bind_connector_props( .to_string(), ); } + if with_properties.is_mysql_cdc_connector() { + // Generate a random server id for mysql cdc source if needed + // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication + // group (that is, different from any other server id being used by any master or slave) + with_properties + .entry("server.id".to_string()) + .or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string()); + } Ok(with_properties) } @@ -1616,7 +1625,6 @@ pub async fn bind_create_source_or_table_with_connector( } else { Some(TableId::placeholder()) }; - let source = SourceCatalog { id: TableId::placeholder().table_id, name: source_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 3f09d7c58dac2..40d6fbca1d83d 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -57,7 +57,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::sink::catalog::SinkFormatDesc; -use risingwave_pb::catalog::{WatermarkDesc, PbWebhookSourceInfo}; +use risingwave_pb::catalog::{PbWebhookSourceInfo, WatermarkDesc}; use risingwave_pb::stream_plan::StreamScanType; use self::heuristic_optimizer::ApplyOrder; diff --git a/src/frontend/src/webhook/mod.rs b/src/frontend/src/webhook/mod.rs index 35f24aa726d65..c2f42eef0492a 100644 --- a/src/frontend/src/webhook/mod.rs +++ b/src/frontend/src/webhook/mod.rs @@ -60,10 +60,6 @@ pub(super) mod handlers { Path((user, database, schema, table)): Path<(String, String, String, String)>, body: Bytes, ) -> Result<()> { - println!( - "WKXLOG receive something: {:?}, database: {}, table: {}", - body, database, table - ); let session_mgr = SESSION_MANAGER .get() .expect("session manager has been initialized"); @@ -166,7 +162,12 @@ pub(super) mod handlers { let _rsp = handle(session, insert_stmt, Arc::from(""), vec![]) .await - .map_err(|e| anyhow!("failed to insert: {:?}", e))?; + .map_err(|e| { + err( + anyhow!(e).context("Failed to insert to table"), + StatusCode::INTERNAL_SERVER_ERROR, + ) + })?; Ok(()) } diff --git a/src/frontend/src/webhook/utils.rs b/src/frontend/src/webhook/utils.rs index 2cda72f69d96e..6f13fb58040e7 100644 --- a/src/frontend/src/webhook/utils.rs +++ b/src/frontend/src/webhook/utils.rs @@ -39,7 +39,7 @@ pub(crate) fn err(err: impl Into, code: StatusCode) -> WebhookErr impl From for WebhookError { fn from(value: anyhow::Error) -> Self { WebhookError { - err: value.into(), + err: value, code: StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 5ef5b50d190ba..9995d8482b8fb 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -369,7 +369,10 @@ derive_array_from_blob!( PbColumnCatalogArray ); derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); -derive_from_blob!(WebhookSourceInfo, risingwave_pb::catalog::PbWebhookSourceInfo); +derive_from_blob!( + WebhookSourceInfo, + risingwave_pb::catalog::PbWebhookSourceInfo +); derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); derive_array_from_blob!( WatermarkDescArray, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index c83ca230d2306..2125ea3763612 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2633,14 +2633,14 @@ impl Parser<'_> { }; self.expect_keyword(Keyword::AS)?; let function_name = self.parse_identifier()?; - if function_name.real_value().to_uppercase() != String::from("SECURE_COMPARE") { + if function_name.real_value().to_uppercase() != *"SECURE_COMPARE" { parser_err!( "SECURE_COMPARE() is the only function supported for secret validation" ); } self.expect_token(&Token::LParen)?; let headers = self.parse_identifier()?; - if headers.real_value().to_uppercase() != String::from("HEADERS") { + if headers.real_value().to_uppercase() != *"HEADERS" { parser_err!("The first argument of SECURE_COMPARE() should be like `HEADERS ->> {{header_key}}`"); } self.expect_token(&Token::LongArrow)?; diff --git a/src/storage/src/compaction_catalog_manager.rs b/src/storage/src/compaction_catalog_manager.rs index 3133cae023300..22ed5cf4fd0b5 100644 --- a/src/storage/src/compaction_catalog_manager.rs +++ b/src/storage/src/compaction_catalog_manager.rs @@ -568,6 +568,7 @@ mod tests { created_at_cluster_version: None, cdc_table_id: None, maybe_vnode_count: None, + webhook_info: None, } }