From c6850f87dde46f833a0550d5f86d153f28b7dae5 Mon Sep 17 00:00:00 2001 From: duonganhthu43 Date: Tue, 30 Jan 2024 14:56:44 +0700 Subject: [PATCH] feat: webhook-connector (#2355) --- Cargo.lock | 92 +++++--- dozer-ingestion/Cargo.toml | 1 + dozer-ingestion/src/lib.rs | 4 + dozer-ingestion/webhook/Cargo.toml | 15 ++ dozer-ingestion/webhook/src/connector.rs | 157 +++++++++++++ dozer-ingestion/webhook/src/lib.rs | 27 +++ dozer-ingestion/webhook/src/server.rs | 190 +++++++++++++++ dozer-ingestion/webhook/src/tests.rs | 222 ++++++++++++++++++ dozer-ingestion/webhook/src/util.rs | 141 +++++++++++ .../src/e2e_tests/runner/running_env.rs | 1 + dozer-types/src/models/connection.rs | 7 +- dozer-types/src/models/ingestion_types.rs | 82 +++++++ json_schemas/dozer.json | 132 +++++++++++ 13 files changed, 1032 insertions(+), 39 deletions(-) create mode 100644 dozer-ingestion/webhook/Cargo.toml create mode 100644 dozer-ingestion/webhook/src/connector.rs create mode 100644 dozer-ingestion/webhook/src/lib.rs create mode 100644 dozer-ingestion/webhook/src/server.rs create mode 100644 dozer-ingestion/webhook/src/tests.rs create mode 100644 dozer-ingestion/webhook/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index ca1a57a170..dae1149309 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.4.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92ef85799cba03f76e4f7c10f533e66d87c9a7e7055f3391f09000ad8351bc9" +checksum = "129d4c88e98860e1758c5de288d1632b07970a16d59bdf7b8d66053d582bb71f" dependencies = [ "actix-codec", "actix-rt", @@ -104,7 +104,7 @@ dependencies = [ "tokio", "tokio-util 0.7.10", "tracing", - "zstd 0.12.4", + "zstd", ] [[package]] @@ -201,9 +201,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.4.0" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4a5b5e29603ca8c94a77c65cf874718ceb60292c5a5c3e5f4ace041af462b9" +checksum = "e43428f3bf11dee6d166b00ec2df4e3aa8cc1606aaa0b7433c146852e2f4e03b" dependencies = [ "actix-codec", "actix-http", @@ -458,9 +458,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" dependencies = [ "anstyle", "anstyle-parse", @@ -861,8 +861,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.0", - "zstd-safe 7.0.0", + "zstd", + "zstd-safe", ] [[package]] @@ -2664,7 +2664,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.13.0", + "zstd", ] [[package]] @@ -3753,7 +3753,7 @@ dependencies = [ "dozer-storage", "dozer-tracing", "dozer-types", - "env_logger", + "env_logger 0.10.1", "futures", "itertools 0.10.5", "metrics", @@ -3859,9 +3859,10 @@ dependencies = [ "dozer-ingestion-object-store", "dozer-ingestion-postgres", "dozer-ingestion-snowflake", + "dozer-ingestion-webhook", "dozer-tracing", "dozer-utils", - "env_logger", + "env_logger 0.10.1", "futures", "hex", "parquet", @@ -4002,6 +4003,17 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "dozer-ingestion-webhook" +version = "0.1.0" +dependencies = [ + "actix-web", + "dozer-ingestion-connector", + "env_logger 0.11.1", + "reqwest", + "tokio", +] + [[package]] name = "dozer-lambda" version = "0.1.0" @@ -4010,7 +4022,7 @@ dependencies = [ "dozer-deno", "dozer-log", "dozer-types", - "env_logger", + "env_logger 0.10.1", ] [[package]] @@ -4027,7 +4039,7 @@ dependencies = [ "clap 4.4.8", "dozer-types", "dyn-clone", - "env_logger", + "env_logger 0.10.1", "futures-util", "nonzero_ext", "pin-project", @@ -4175,7 +4187,7 @@ dependencies = [ "dozer-tracing", "dozer-types", "dozer-utils", - "env_logger", + "env_logger 0.10.1", "futures", "libtest-mimic", "mongodb", @@ -4498,6 +4510,16 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.1" @@ -4511,6 +4533,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -7735,7 +7770,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd 0.13.0", + "zstd", ] [[package]] @@ -8998,9 +9033,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "async-compression", "base64 0.21.5", @@ -12757,32 +12792,13 @@ dependencies = [ "flate2", ] -[[package]] -name = "zstd" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" -dependencies = [ - "zstd-safe 6.0.6", -] - [[package]] name = "zstd" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" dependencies = [ - "zstd-safe 7.0.0", -] - -[[package]] -name = "zstd-safe" -version = "6.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" -dependencies = [ - "libc", - "zstd-sys", + "zstd-safe", ] [[package]] diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index fabd820b84..eac31632b8 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -19,6 +19,7 @@ dozer-ingestion-mysql = { path = "./mysql" } dozer-ingestion-object-store = { path = "./object-store" } dozer-ingestion-postgres = { path = "./postgres" } dozer-ingestion-snowflake = { path = "./snowflake", optional = true } +dozer-ingestion-webhook = { path = "./webhook" } tokio = { version = "1", features = ["full"] } futures = "0.3.28" diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs index 6d520cc3e0..9990917484 100644 --- a/dozer-ingestion/src/lib.rs +++ b/dozer-ingestion/src/lib.rs @@ -28,6 +28,7 @@ use dozer_ingestion_postgres::{ }; #[cfg(feature = "snowflake")] use dozer_ingestion_snowflake::connector::SnowflakeConnector; +use dozer_ingestion_webhook::connector::WebhookConnector; use errors::ConnectorError; use tokio::runtime::Runtime; @@ -132,6 +133,9 @@ pub fn get_connector( ConnectionConfig::Dozer(dozer_config) => { Ok(Box::new(NestedDozerConnector::new(dozer_config))) } + ConnectionConfig::Webhook(webhook_config) => { + Ok(Box::new(WebhookConnector::new(webhook_config))) + } ConnectionConfig::JavaScript(javascript_config) => Ok(Box::new(JavaScriptConnector::new( runtime, javascript_config, diff --git a/dozer-ingestion/webhook/Cargo.toml b/dozer-ingestion/webhook/Cargo.toml new file mode 100644 index 0000000000..644173e053 --- /dev/null +++ b/dozer-ingestion/webhook/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "dozer-ingestion-webhook" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dozer-ingestion-connector = { path = "../connector" } +actix-web = "4.4.1" +env_logger = "0.11.1" + +[dev-dependencies] +tokio = { version = "1.0", features = ["full", "test-util"] } +reqwest = { version = "0.11.23", features = ["json", "blocking"] } diff --git a/dozer-ingestion/webhook/src/connector.rs b/dozer-ingestion/webhook/src/connector.rs new file mode 100644 index 0000000000..024c2d158d --- /dev/null +++ b/dozer-ingestion/webhook/src/connector.rs @@ -0,0 +1,157 @@ +use crate::{server::WebhookServer, util::extract_source_schema, Error}; +use dozer_ingestion_connector::{ + async_trait, + dozer_types::{ + self, errors::internal::BoxedError, models::ingestion_types::WebhookConfig, + node::OpIdentifier, + }, + utils::TableNotFound, + Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, +}; +use std::{collections::HashMap, sync::Arc, vec}; + +#[derive(Debug)] +pub struct WebhookConnector { + pub config: WebhookConfig, +} + +impl WebhookConnector { + pub fn new(config: WebhookConfig) -> Self { + Self { config } + } + + fn get_all_schemas(&self) -> Result, Error> { + let mut result: HashMap = HashMap::new(); + let config = &self.config; + for endpoint in &config.endpoints { + let schemas = extract_source_schema(endpoint.schema.clone()); + for (key, value) in schemas { + result.insert(key, value); + } + } + + Ok(result) + } +} + +#[async_trait] +impl Connector for WebhookConnector { + fn types_mapping() -> Vec<(String, Option)> + where + Self: Sized, + { + todo!() + } + + async fn validate_connection(&self) -> Result<(), BoxedError> { + Ok(()) + } + + async fn list_tables(&self) -> Result, BoxedError> { + Ok(self + .get_all_schemas()? + .into_keys() + .map(TableIdentifier::from_table_name) + .collect()) + } + + async fn validate_tables(&self, tables: &[TableIdentifier]) -> Result<(), BoxedError> { + let schemas = self.get_all_schemas()?; + for table in tables { + if !schemas + .iter() + .any(|(name, _)| name == &table.name && table.schema.is_none()) + { + return Err(TableNotFound { + schema: table.schema.clone(), + name: table.name.clone(), + } + .into()); + } + } + Ok(()) + } + + async fn list_columns( + &self, + tables: Vec, + ) -> Result, BoxedError> { + let schemas = self.get_all_schemas()?; + let mut result: Vec = vec![]; + for table in tables { + let source_schema_option = schemas.get(table.name.as_str()); + match source_schema_option { + Some(source_schema) => { + let column_names = source_schema + .schema + .fields + .iter() + .map(|field| field.name.clone()) + .collect(); + if result + .iter() + .any(|table_info| table_info.name == table.name) + { + continue; + } + result.push(TableInfo { + schema: table.schema, + name: table.name, + column_names, + }) + } + None => { + return Err(TableNotFound { + schema: table.schema.clone(), + name: table.name.clone(), + } + .into()); + } + } + } + Ok(result) + } + + async fn get_schemas( + &self, + table_infos: &[TableInfo], + ) -> Result, BoxedError> { + let schemas = self.get_all_schemas()?; + let mut result = vec![]; + for table in table_infos { + let table_name = table.name.clone(); + let schema = schemas.get(table_name.as_str()); + match schema { + Some(schema) => { + result.push(Ok(schema.clone())); + } + None => { + result.push(Err(TableNotFound { + schema: table.schema.clone(), + name: table.name.clone(), + } + .into())); + } + } + } + Ok(result) + } + + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + + async fn start( + &self, + ingestor: &Ingestor, + tables: Vec, + _last_checkpoint: Option, + ) -> Result<(), BoxedError> { + let config = self.config.clone(); + let server = WebhookServer::new(config); + server + .start(Arc::new(ingestor.to_owned()), tables) + .await + .map_err(Into::into) + } +} diff --git a/dozer-ingestion/webhook/src/lib.rs b/dozer-ingestion/webhook/src/lib.rs new file mode 100644 index 0000000000..ee1a902034 --- /dev/null +++ b/dozer-ingestion/webhook/src/lib.rs @@ -0,0 +1,27 @@ +use std::{net::AddrParseError, path::PathBuf}; + +use dozer_ingestion_connector::dozer_types::{ + serde_json, + thiserror::{self, Error}, +}; +pub mod connector; +mod server; +#[cfg(test)] +mod tests; +mod util; +#[derive(Debug, Error)] + +pub enum Error { + #[error("cannot read file {0:?}: {1}")] + CannotReadFile(PathBuf, #[source] std::io::Error), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), + #[error("arrow error: {0}")] + AddrParse(#[from] AddrParseError), + #[error("default adapter cannot handle arrow ingest message")] + SchemaNotFound(String), + #[error("field {0} not found in schema")] + FieldNotFound(String), + #[error("actix web start error: {0}")] + ActixWebStartError(#[from] std::io::Error), +} diff --git a/dozer-ingestion/webhook/src/server.rs b/dozer-ingestion/webhook/src/server.rs new file mode 100644 index 0000000000..b385a5db1f --- /dev/null +++ b/dozer-ingestion/webhook/src/server.rs @@ -0,0 +1,190 @@ +use crate::{ + util::{extract_source_schema, map_record}, + Error, +}; +use actix_web::{ + web::{self, Data}, + App, HttpRequest, HttpServer, Responder, +}; +use dozer_ingestion_connector::{ + dozer_types::{ + models::ingestion_types::{IngestionMessage, WebhookConfig, WebhookVerb}, + serde_json, + types::{Operation, Record}, + }, + Ingestor, SourceSchema, TableInfo, +}; +use std::{collections::HashMap, sync::Arc}; + +pub(crate) struct WebhookServer { + config: WebhookConfig, +} + +impl WebhookServer { + pub(crate) fn new(config: WebhookConfig) -> Self { + Self { config } + } + + pub(crate) async fn start( + &self, + ingestor: Arc, + tables: Vec, + ) -> Result<(), Error> { + let config = self.config.clone(); + // Clone or extract necessary data from `self` + let server = HttpServer::new(move || { + let mut app = App::new(); + + for endpoint in config.endpoints.iter() { + let endpoint_data = endpoint.clone(); + let source_schema_dict = extract_source_schema(endpoint_data.to_owned().schema); + let tables = tables.clone(); + let mut app_resource = web::resource(endpoint_data.path) + .app_data(web::Data::new(Arc::clone(&ingestor))) + .app_data(web::Data::new(source_schema_dict)) + .app_data(web::Data::new(tables)); + for verb in &endpoint.verbs { + app_resource = match verb { + WebhookVerb::POST => app_resource.route(web::post().to(Self::post_handler)), + WebhookVerb::DELETE => { + app_resource.route(web::delete().to(Self::delete_handler)) + } + _ => app_resource.route(web::route().to(Self::other_handler)), + }; + } + app = app.service(app_resource); + } + app + }); + + let host = config + .host + .clone() + .unwrap_or_else(|| "127.0.0.1".to_string()); + let port = config.port.unwrap_or(8080); + let address = format!("{}:{}", host, port); // Format host and port into a single string + let server = server.bind(address)?.run(); + server.await.map_err(Into::into) + } + + fn common_handler( + tables: Data>, + schema_dict: Data>, + info: web::Json, + ) -> actix_web::Result)>, actix_web::error::Error> { + let source_schema_dict = schema_dict.get_ref(); + let info = &info.into_inner(); + + let mut result: Vec<(usize, Vec)> = vec![]; + if let serde_json::Value::Object(object) = info { + for (schema_name, values) in object.iter() { + let schema = match source_schema_dict.get(schema_name) { + Some(schema) => schema, + None => return Err(actix_web::error::ErrorBadRequest("Invalid schema name")), + }; + let records = match values.as_array() { + Some(values_arr) => values_arr + .iter() + .map(|value_element| { + let value = value_element.as_object().ok_or_else(|| { + actix_web::error::ErrorBadRequest("Invalid value") + })?; + map_record(value.to_owned(), &schema.schema) + .map_err(actix_web::error::ErrorBadRequest) + }) + .collect::, _>>(), + None => { + let value = values + .as_object() + .ok_or_else(|| actix_web::error::ErrorBadRequest("Invalid value"))?; + map_record(value.to_owned(), &schema.schema) + .map(|e| vec![e]) + .map_err(|e| actix_web::error::ErrorBadRequest(e.to_string())) + } + }?; + let table_idx = tables + .iter() + .position(|table| table.name.as_str() == schema_name) + .ok_or_else(|| actix_web::error::ErrorBadRequest("Invalid table name"))?; + result.push((table_idx, records)); + } + } else { + return Err(actix_web::error::ErrorBadRequest("Invalid JSON")); + } + Ok(result) + } + + async fn post_handler( + ingestor: Data>, + schema_dict: Data>, + tables: Data>, + info: web::Json, + ) -> actix_web::Result { + let ingestor = ingestor.get_ref(); + let records = Self::common_handler(tables, schema_dict, info)?; + + for (table_idx, records) in records { + let op: IngestionMessage = if records.len() == 1 { + IngestionMessage::OperationEvent { + table_index: table_idx, + op: Operation::Insert { + new: records[0].clone(), + }, + state: None, + } + } else { + IngestionMessage::OperationEvent { + table_index: table_idx, + op: Operation::BatchInsert { new: records }, + state: None, + } + }; + ingestor + .handle_message(op) + .await + .map_err(|e| actix_web::error::ErrorInternalServerError(format!("Error: {}", e)))?; + } + + let json_response = serde_json::json!({ + "status": "ok" + }); + + Ok(web::Json(json_response)) + } + + async fn delete_handler( + ingestor: Data>, + schema_dict: Data>, + tables: Data>, + info: web::Json, + ) -> actix_web::Result { + let ingestor = ingestor.get_ref(); + let records = Self::common_handler(tables, schema_dict, info)?; + for (table_idx, records) in records { + for record in records { + let op: IngestionMessage = IngestionMessage::OperationEvent { + table_index: table_idx, + op: Operation::Delete { old: record }, + state: None, + }; + ingestor.handle_message(op).await.map_err(|e| { + actix_web::error::ErrorInternalServerError(format!("Error: {}", e)) + })?; + } + } + + let json_response = serde_json::json!({ + "status": "ok" + }); + + Ok(web::Json(json_response)) + } + async fn other_handler(req: HttpRequest) -> actix_web::Result { + // get VERB from request + let verb = req.method().as_str(); + let json_response = serde_json::json!({ + "status": format!("{} not supported", verb) + }); + Ok(web::Json(json_response)) + } +} diff --git a/dozer-ingestion/webhook/src/tests.rs b/dozer-ingestion/webhook/src/tests.rs new file mode 100644 index 0000000000..67cab54092 --- /dev/null +++ b/dozer-ingestion/webhook/src/tests.rs @@ -0,0 +1,222 @@ +use crate::connector::WebhookConnector; +use dozer_ingestion_connector::{ + dozer_types::{ + json_types::json_from_str, + models::ingestion_types::{ + IngestionMessage, WebhookConfig, WebhookConfigSchemas, WebhookEndpoint, WebhookVerb, + }, + serde_json::{self, json}, + types::{Field, Record}, + }, + test_util::{create_test_runtime, spawn_connector_all_tables}, + tokio::runtime::Runtime, + IngestionIterator, +}; +use std::sync::Arc; + +fn ingest_webhook( + runtime: Arc, + port: u32, +) -> ( + IngestionIterator, + dozer_ingestion_connector::futures::future::AbortHandle, +) { + let user_schema = r#" + { + "users": { + "schema": { + "fields": [ + { + "name": "id", + "typ": "Int", + "nullable": false + }, + { + "name": "name", + "typ": "String", + "nullable": true + }, + { + "name": "json", + "typ": "Json", + "nullable": true + } + ] + } + } + } + "#; + let customer_schema = r#" + { + "customers": { + "schema": { + "fields": [ + { + "name": "id", + "typ": "Int", + "nullable": false + }, + { + "name": "name", + "typ": "String", + "nullable": true + }, + { + "name": "json", + "typ": "Json", + "nullable": true + } + ] + } + } + } + "#; + let webhook_connector = WebhookConnector::new(WebhookConfig { + port: Some(port), + host: None, + endpoints: vec![ + WebhookEndpoint { + path: "/customers".to_string(), + verbs: vec![WebhookVerb::POST, WebhookVerb::DELETE], + schema: WebhookConfigSchemas::Inline(customer_schema.to_string()), + }, + WebhookEndpoint { + path: "/users".to_string(), + verbs: vec![WebhookVerb::POST, WebhookVerb::DELETE], + schema: WebhookConfigSchemas::Inline(user_schema.to_string()), + }, + ], + }); + spawn_connector_all_tables(runtime.clone(), webhook_connector) +} + +#[test] +fn ingest_webhook_batch_insert() { + let runtime = create_test_runtime(); + let port = 58883; + let result: (IngestionIterator, _) = ingest_webhook(runtime.clone(), port); + // call http request to webhook endpoint + let client = reqwest::blocking::Client::new(); + let post_value = json!({ + "users": [ + { + "id": 1, + "name": "John Doe", + "json": { + "key": "value" + } + }, + { + "id": 2, + "name": "Jane Doe", + "json": { + "key": "value" + } + } + ] + }); + let http_result = client + .post(format!("http://127.0.0.1:{:}/users", port)) + .json(&post_value) + .send(); + assert!(http_result.is_ok()); + let response = http_result.unwrap(); + assert!(response.status().is_success()); + let mut iterator = result.0; + let msg = iterator.next().unwrap(); + + let ivalue_str = + json_from_str(&serde_json::to_string(&json!({"key": "value"})).unwrap()).unwrap(); + + let expected_fields1: Vec = vec![ + Field::Int(1), + Field::String("John Doe".to_string()), + Field::Json(ivalue_str.clone()), + ]; + let expected_record1 = Record { + values: expected_fields1, + lifetime: None, + }; + + let expected_fields2: Vec = vec![ + Field::Int(2), + Field::String("Jane Doe".to_string()), + Field::Json(ivalue_str), + ]; + let expected_record2 = Record { + values: expected_fields2, + lifetime: None, + }; + + if let IngestionMessage::OperationEvent { + table_index: _, + op, + state: _, + } = msg + { + assert_eq!( + op, + dozer_ingestion_connector::dozer_types::types::Operation::BatchInsert { + new: vec![expected_record1, expected_record2], + } + ); + } else { + panic!("Expected operation event"); + } +} + +#[test] +fn ingest_webhook_delete() { + let runtime = create_test_runtime(); + let port = 58884; + let result: (IngestionIterator, _) = ingest_webhook(runtime.clone(), port); + // call http request to webhook endpoint + let client = reqwest::blocking::Client::new(); + let delete_value = json!({ + "users": [ + { + "id": 1, + "name": "John Doe", + "json": { + "key": "value" + } + } + ] + }); + let http_result = client + .delete(format!("http://127.0.0.1:{:}/users", port)) + .json(&delete_value) + .send(); + assert!(http_result.is_ok()); + let response = http_result.unwrap(); + assert!(response.status().is_success()); + let mut iterator = result.0; + let msg = iterator.next().unwrap(); + let ivalue_str = + json_from_str(&serde_json::to_string(&json!({"key": "value"})).unwrap()).unwrap(); + + let expected_fields1: Vec = vec![ + Field::Int(1), + Field::String("John Doe".to_string()), + Field::Json(ivalue_str.clone()), + ]; + let expected_record1 = Record { + values: expected_fields1.to_owned(), + lifetime: None, + }; + + if let IngestionMessage::OperationEvent { + table_index: _, + op, + state: _, + } = msg + { + if let dozer_ingestion_connector::dozer_types::types::Operation::Delete { old } = op { + assert_eq!(old, expected_record1); + } else { + panic!("Expected delete operation"); + } + } else { + panic!("Expected operation event"); + } +} diff --git a/dozer-ingestion/webhook/src/util.rs b/dozer-ingestion/webhook/src/util.rs new file mode 100644 index 0000000000..f3ddd6e461 --- /dev/null +++ b/dozer-ingestion/webhook/src/util.rs @@ -0,0 +1,141 @@ +use crate::Error; +use dozer_ingestion_connector::{ + dozer_types::{ + chrono::{self, NaiveDate}, + json_types::json_from_str, + models::ingestion_types::WebhookConfigSchemas, + ordered_float::OrderedFloat, + rust_decimal::Decimal, + serde_json, + types::{Field, FieldType, Record, Schema}, + }, + SourceSchema, +}; +use std::{collections::HashMap, fs, path::Path}; + +pub fn extract_source_schema(input: WebhookConfigSchemas) -> HashMap { + match input { + WebhookConfigSchemas::Inline(schema_str) => { + let schemas: HashMap = serde_json::from_str(&schema_str).unwrap(); + schemas + } + WebhookConfigSchemas::Path(path) => { + let path = Path::new(&path); + let schema_str = fs::read_to_string(path).unwrap(); + let schemas: HashMap = serde_json::from_str(&schema_str).unwrap(); + schemas + } + } +} + +pub fn map_record( + rec: serde_json::map::Map, + schema: &Schema, +) -> Result { + let mut values: Vec = vec![]; + let fields = schema.fields.clone(); + for field in fields.into_iter() { + let field_name = field.name.clone(); + let field_value = rec.get(&field_name); + if !field.nullable && field_value.is_none() { + return Err(Error::FieldNotFound(field_name)); + } + match field_value { + Some(value) => match field.typ { + FieldType::String => { + let str_value: String = serde_json::from_value(value.clone())?; + let field = Field::String(str_value); + values.push(field); + } + FieldType::Int => { + let i64_value: i64 = serde_json::from_value(value.clone())?; + let field = Field::Int(i64_value); + values.push(field); + } + FieldType::Float => { + let float_value: f64 = serde_json::from_value(value.clone())?; + let field = Field::Float(OrderedFloat(float_value)); + values.push(field); + } + FieldType::Boolean => { + let bool_value: bool = serde_json::from_value(value.clone())?; + let field = Field::Boolean(bool_value); + values.push(field); + } + FieldType::Timestamp => { + let i64_value: i64 = serde_json::from_value(value.clone())?; + let timestamp_value = chrono::NaiveDateTime::from_timestamp_millis(i64_value) + .map(|t| { + Field::Timestamp( + chrono::DateTime::::from_naive_utc_and_offset( + t, + chrono::Utc, + ) + .into(), + ) + }) + .unwrap_or(Field::Null); + values.push(timestamp_value); + } + FieldType::Date => { + let i64_value: NaiveDate = serde_json::from_value(value.clone())?; + let field = Field::Date(i64_value); + values.push(field); + } + FieldType::UInt => { + let u64_value: u64 = serde_json::from_value(value.clone())?; + let field = Field::UInt(u64_value); + values.push(field); + } + FieldType::U128 => { + let u128_value: u128 = serde_json::from_value(value.clone())?; + let field = Field::U128(u128_value); + values.push(field); + } + FieldType::I128 => { + let i128_value: i128 = serde_json::from_value(value.clone())?; + let field = Field::I128(i128_value); + values.push(field); + } + FieldType::Text => { + let str_value: String = serde_json::from_value(value.clone())?; + let field = Field::Text(str_value); + values.push(field); + } + FieldType::Binary => { + let str_value: String = serde_json::from_value(value.clone())?; + let field = Field::Binary(str_value.into_bytes()); + values.push(field); + } + FieldType::Decimal => { + let str_value: String = serde_json::from_value(value.clone())?; + let decimal_value: Decimal = + Decimal::from_str_exact(str_value.as_str()).unwrap(); + let field = Field::Decimal(decimal_value); + values.push(field); + } + FieldType::Json => { + let str_value: String = serde_json::to_string(value)?; + let ivalue_str = json_from_str(str_value.as_str()).unwrap(); + let field = Field::Json(ivalue_str); + values.push(field); + } + FieldType::Point => { + values.push(Field::Null); + } + FieldType::Duration => { + values.push(Field::Null); + } + }, + None => { + let field = Field::Null; + values.push(field); + } + } + } + + Ok(Record { + values, + lifetime: None, + }) +} diff --git a/dozer-tests/src/e2e_tests/runner/running_env.rs b/dozer-tests/src/e2e_tests/runner/running_env.rs index eaca6a325e..c47765ffe0 100644 --- a/dozer-tests/src/e2e_tests/runner/running_env.rs +++ b/dozer-tests/src/e2e_tests/runner/running_env.rs @@ -368,6 +368,7 @@ fn write_dozer_config_for_running_in_docker_compose( todo!("Map dozer host and port") } ConnectionConfig::JavaScript(_) => {} + ConnectionConfig::Webhook(_) => {} } } diff --git a/dozer-types/src/models/connection.rs b/dozer-types/src/models/connection.rs index 210724af04..aec0fe5006 100644 --- a/dozer-types/src/models/connection.rs +++ b/dozer-types/src/models/connection.rs @@ -1,6 +1,7 @@ use crate::models::ingestion_types::{ DeltaLakeConfig, EthConfig, GrpcConfig, JavaScriptConfig, KafkaConfig, LocalStorage, - MongodbConfig, MySQLConfig, NestedDozerConfig, S3Storage, SnowflakeConfig, SECRET, + MongodbConfig, MySQLConfig, NestedDozerConfig, S3Storage, SnowflakeConfig, WebhookConfig, + SECRET, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -243,6 +244,9 @@ pub enum ConnectionConfig { /// In yaml, present as tag" `!JavaScript` JavaScript(JavaScriptConfig), + + /// In yaml, present as tag" `!Webhook` + Webhook(WebhookConfig), } impl ConnectionConfig { @@ -260,6 +264,7 @@ impl ConnectionConfig { ConnectionConfig::MySQL(_) => "mysql".to_string(), ConnectionConfig::Dozer(_) => "dozer".to_string(), ConnectionConfig::JavaScript(_) => "javascript".to_string(), + ConnectionConfig::Webhook(_) => "webhook".to_string(), } } } diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 6e6982b376..46f05a3fb4 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -543,3 +543,85 @@ pub struct JavaScriptConfig { pub fn default_bootstrap_path() -> String { String::from("src/js/bootstrap.js") } + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] +pub struct WebhookConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub host: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub port: Option, + + pub endpoints: Vec, +} +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] +pub struct WebhookEndpoint { + pub path: String, + pub verbs: Vec, + pub schema: WebhookConfigSchemas, +} +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +#[schemars(example = "Self::example")] +pub enum WebhookVerb { + POST, // insert + PUT, // update + DELETE, // delete +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema)] +pub enum WebhookConfigSchemas { + Inline(String), + Path(String), +} + +impl SchemaExample for WebhookConfig { + fn example() -> Self { + Self { + host: Some("localhost".to_owned()), + port: Some(50059), + endpoints: vec![WebhookEndpoint::example()], + } + } +} + +impl SchemaExample for WebhookEndpoint { + fn example() -> Self { + let user_schema = r#" + { + "users": { + "schema": { + "fields": [ + { + "name": "id", + "typ": "Int", + "nullable": false + }, + { + "name": "name", + "typ": "String", + "nullable": true + }, + { + "name": "json", + "typ": "Json", + "nullable": true + } + ] + } + } + } + "#; + Self { + path: "/ingest".to_owned(), + verbs: vec![WebhookVerb::POST, WebhookVerb::DELETE], + schema: WebhookConfigSchemas::Inline(user_schema.to_string()), + } + } +} +impl SchemaExample for WebhookVerb { + fn example() -> Self { + Self::POST + } +} diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index b9ee851aa8..aa4ded69f6 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -621,6 +621,19 @@ } }, "additionalProperties": false + }, + { + "description": "In yaml, present as tag\" `!Webhook`", + "type": "object", + "required": [ + "Webhook" + ], + "properties": { + "Webhook": { + "$ref": "#/definitions/WebhookConfig" + } + }, + "additionalProperties": false } ] }, @@ -2172,6 +2185,125 @@ "Manual" ] }, + "WebhookConfig": { + "examples": [ + { + "host": "localhost", + "port": 50059, + "endpoints": [ + { + "path": "/ingest", + "verbs": [ + "POST", + "DELETE" + ], + "schema": { + "Inline": "\n {\n \"users\": {\n \"schema\": {\n \"fields\": [\n {\n \"name\": \"id\",\n \"typ\": \"Int\",\n \"nullable\": false\n },\n {\n \"name\": \"name\",\n \"typ\": \"String\",\n \"nullable\": true\n },\n {\n \"name\": \"json\",\n \"typ\": \"Json\",\n \"nullable\": true\n }\n ]\n }\n }\n }\n " + } + } + ] + } + ], + "type": "object", + "required": [ + "endpoints" + ], + "properties": { + "endpoints": { + "type": "array", + "items": { + "$ref": "#/definitions/WebhookEndpoint" + } + }, + "host": { + "type": [ + "string", + "null" + ] + }, + "port": { + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 + } + } + }, + "WebhookConfigSchemas": { + "oneOf": [ + { + "type": "object", + "required": [ + "Inline" + ], + "properties": { + "Inline": { + "type": "string" + } + }, + "additionalProperties": false + }, + { + "type": "object", + "required": [ + "Path" + ], + "properties": { + "Path": { + "type": "string" + } + }, + "additionalProperties": false + } + ] + }, + "WebhookEndpoint": { + "examples": [ + { + "path": "/ingest", + "verbs": [ + "POST", + "DELETE" + ], + "schema": { + "Inline": "\n {\n \"users\": {\n \"schema\": {\n \"fields\": [\n {\n \"name\": \"id\",\n \"typ\": \"Int\",\n \"nullable\": false\n },\n {\n \"name\": \"name\",\n \"typ\": \"String\",\n \"nullable\": true\n },\n {\n \"name\": \"json\",\n \"typ\": \"Json\",\n \"nullable\": true\n }\n ]\n }\n }\n }\n " + } + } + ], + "type": "object", + "required": [ + "path", + "schema", + "verbs" + ], + "properties": { + "path": { + "type": "string" + }, + "schema": { + "$ref": "#/definitions/WebhookConfigSchemas" + }, + "verbs": { + "type": "array", + "items": { + "$ref": "#/definitions/WebhookVerb" + } + } + } + }, + "WebhookVerb": { + "examples": [ + "POST" + ], + "type": "string", + "enum": [ + "POST", + "PUT", + "DELETE" + ] + }, "XRayConfig": { "type": "object", "required": [