diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0824abe2f2..b884cb08c0 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -181,7 +181,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { .exclude .as_ref() .map(|ss| ss.iter().map(|s| s.to_string()).collect()) - .unwrap_or_default() + .unwrap_or_default(), }); } @@ -516,7 +516,13 @@ fn parse_db_options( let val = match opt.value { sqlparser::ast::Value::SingleQuotedString(ref str) => str, sqlparser::ast::Value::Number(ref v, _) => v, - sqlparser::ast::Value::Boolean(v) => if v { "true" } else { "false" }, + sqlparser::ast::Value::Boolean(v) => { + if v { + "true" + } else { + "false" + } + } _ => panic!("invalid option type for peer"), }; opts.insert(&opt.name.value, val); @@ -673,11 +679,8 @@ fn parse_db_options( Some(config) } DbType::Eventhub => { - let conn_str: String = opts - .get("metadata_db") - .map(|s| s.to_string()) - .unwrap_or_default(); - let metadata_db = parse_metadata_db_info(&conn_str)?; + let conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let metadata_db = parse_metadata_db_info(conn_str)?; let subscription_id = opts .get("subscription_id") .map(|s| s.to_string()) @@ -721,11 +724,8 @@ fn parse_db_options( Some(config) } DbType::S3 => { - let s3_conn_str: String = opts - .get("metadata_db") - .map(|s| s.to_string()) - .unwrap_or_default(); - let metadata_db = parse_metadata_db_info(&s3_conn_str)?; + let s3_conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let metadata_db = parse_metadata_db_info(s3_conn_str)?; let s3_config = S3Config { url: opts .get("url") @@ -764,16 +764,9 @@ fn parse_db_options( Some(config) } DbType::EventhubGroup => { - let conn_str = opts - .get("metadata_db") - .context("no metadata db specified")?; + let conn_str = opts.get("metadata_db").map(|s| s.to_string()); let metadata_db = parse_metadata_db_info(conn_str)?; - // metadata_db is required for eventhub group - if metadata_db.is_none() { - anyhow::bail!("metadata_db is required for eventhub group"); - } - // split comma separated list of columns and trim let unnest_columns = opts .get("unnest_columns") @@ -823,10 +816,11 @@ fn parse_db_options( Ok(config) } -fn parse_metadata_db_info(conn_str: &str) -> anyhow::Result> { - if conn_str.is_empty() { - return Ok(None); - } +fn parse_metadata_db_info(conn_str_opt: Option) -> anyhow::Result> { + let conn_str = match conn_str_opt { + Some(conn_str) => conn_str, + None => return Ok(None), + }; let mut metadata_db = PostgresConfig::default(); let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 88af3da66b..a4f69eb1cb 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -92,13 +92,13 @@ const QREP_OPTIONS: &[QRepOptionType] = &[ QRepOptionType::Boolean { name: "setup_watermark_table_on_destination", default_value: false, - required: false + required: false, }, QRepOptionType::Boolean { name: "dst_table_full_resync", default_value: false, - required: false - } + required: false, + }, ]; pub fn process_options( diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 15fccf2233..f192568791 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -426,7 +426,8 @@ impl Catalog { .await?; let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| { - let flow_opts: HashMap = row.get::<&str, Option>("flow_metadata") + let flow_opts: HashMap = row + .get::<&str, Option>("flow_metadata") .and_then(|flow_opts| serde_json::from_value(flow_opts).ok()) .unwrap_or_default(); diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 4e09f093fb..896af4f31b 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -1,4 +1,3 @@ - use catalog::WorkflowDetails; use pt::{ flow_model::{FlowJob, QRepFlowJob}, diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index 190b6d21be..8429e0ebe1 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -3,10 +3,9 @@ use std::ops::ControlFlow; use sqlparser::ast::Value::Number; use sqlparser::ast::{ - visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, - Array, ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, - Function, FunctionArg, FunctionArgExpr, Ident, - ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, + visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array, + ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, + FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, }; #[derive(Default)] @@ -99,11 +98,7 @@ impl BigqueryAst { visit_expressions_mut(query, |node| { // CAST AS Text to CAST AS String - if let Expr::Cast { - data_type: dt, - .. - } = node - { + if let Expr::Cast { data_type: dt, .. } = node { if let DataType::Text = dt { *dt = DataType::String(None); } @@ -177,7 +172,6 @@ impl BigqueryAst { distinct: false, special: false, order_by: vec![], - }) } else if let BinaryOperator::Plus = op { *node = Expr::Function(Function { @@ -241,7 +235,12 @@ impl BigqueryAst { // flatten ANY to IN operation overall. visit_expressions_mut(query, |node| { - if let Expr::AnyOp { left, compare_op, right } = node { + if let Expr::AnyOp { + left, + compare_op, + right, + } = node + { if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) { let list = self .flatten_expr_to_in_list(right) @@ -257,8 +256,6 @@ impl BigqueryAst { ControlFlow::<()>::Continue(()) }); - - Ok(()) } @@ -300,7 +297,10 @@ impl BigqueryAst { fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result> { let mut list = vec![]; // check if expr is of type Cast - if let Expr::Cast { expr, data_type, .. } = expr { + if let Expr::Cast { + expr, data_type, .. + } = expr + { // assert that expr is of type SingleQuotedString if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() { // trim the starting and ending curly braces @@ -309,39 +309,40 @@ impl BigqueryAst { let split = s.split(','); // match on data type, and create a vector of Expr::Value match data_type { - DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) | - DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) - => match inner.as_ref() { - DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { - for s in split { - list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString( - s.to_string(), - ))); + DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) + | DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => { + match inner.as_ref() { + DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { + for s in split { + list.push(Expr::Value( + sqlparser::ast::Value::SingleQuotedString(s.to_string()), + )); + } } - } - DataType::Integer(_) - | DataType::Float(_) - | DataType::BigInt(_) - | DataType::UnsignedBigInt(_) - | DataType::UnsignedInteger(_) - | DataType::UnsignedSmallInt(_) - | DataType::UnsignedTinyInt(_) - | DataType::TinyInt(_) - | DataType::UnsignedInt(_) => { - for s in split { - list.push(Expr::Value(sqlparser::ast::Value::Number( - s.to_string(), - false, - ))); + DataType::Integer(_) + | DataType::Float(_) + | DataType::BigInt(_) + | DataType::UnsignedBigInt(_) + | DataType::UnsignedInteger(_) + | DataType::UnsignedSmallInt(_) + | DataType::UnsignedTinyInt(_) + | DataType::TinyInt(_) + | DataType::UnsignedInt(_) => { + for s in split { + list.push(Expr::Value(sqlparser::ast::Value::Number( + s.to_string(), + false, + ))); + } + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported inner data type for IN list: {:?}", + data_type + )) } } - _ => { - return Err(anyhow::anyhow!( - "Unsupported inner data type for IN list: {:?}", - data_type - )) - } - }, + } _ => { return Err(anyhow::anyhow!( "Unsupported data type for IN list: {:?}", diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index d4acce7fd0..4dd0bd9dfe 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -64,7 +64,10 @@ impl BqSchema { .schema .as_ref() .expect("Schema is not present"); - let fields = bq_schema.fields.as_ref().expect("Schema fields are not present"); + let fields = bq_schema + .fields + .as_ref() + .expect("Schema fields are not present"); let schema = SchemaRef::new(Schema { fields: fields @@ -76,7 +79,10 @@ impl BqSchema { .collect(), }); - Self { schema, fields: fields.clone() } + Self { + schema, + fields: fields.clone(), + } } pub fn schema(&self) -> SchemaRef {