Skip to content

Commit

Permalink
remove metadata db for validation
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 4, 2024
1 parent 2c29cc1 commit f7ff9cd
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 75 deletions.
42 changes: 18 additions & 24 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
.exclude
.as_ref()
.map(|ss| ss.iter().map(|s| s.to_string()).collect())
.unwrap_or_default()
.unwrap_or_default(),
});
}

Expand Down Expand Up @@ -516,7 +516,13 @@ fn parse_db_options(
let val = match opt.value {
sqlparser::ast::Value::SingleQuotedString(ref str) => str,
sqlparser::ast::Value::Number(ref v, _) => v,
sqlparser::ast::Value::Boolean(v) => if v { "true" } else { "false" },
sqlparser::ast::Value::Boolean(v) => {
if v {
"true"
} else {
"false"
}
}
_ => panic!("invalid option type for peer"),
};
opts.insert(&opt.name.value, val);
Expand Down Expand Up @@ -673,11 +679,8 @@ fn parse_db_options(
Some(config)
}
DbType::Eventhub => {
let conn_str: String = opts
.get("metadata_db")
.map(|s| s.to_string())
.unwrap_or_default();
let metadata_db = parse_metadata_db_info(&conn_str)?;
let conn_str = opts.get("metadata_db").map(|s| s.to_string());
let metadata_db = parse_metadata_db_info(conn_str)?;
let subscription_id = opts
.get("subscription_id")
.map(|s| s.to_string())
Expand Down Expand Up @@ -721,11 +724,8 @@ fn parse_db_options(
Some(config)
}
DbType::S3 => {
let s3_conn_str: String = opts
.get("metadata_db")
.map(|s| s.to_string())
.unwrap_or_default();
let metadata_db = parse_metadata_db_info(&s3_conn_str)?;
let s3_conn_str = opts.get("metadata_db").map(|s| s.to_string());
let metadata_db = parse_metadata_db_info(s3_conn_str)?;
let s3_config = S3Config {
url: opts
.get("url")
Expand Down Expand Up @@ -764,16 +764,9 @@ fn parse_db_options(
Some(config)
}
DbType::EventhubGroup => {
let conn_str = opts
.get("metadata_db")
.context("no metadata db specified")?;
let conn_str = opts.get("metadata_db").map(|s| s.to_string());
let metadata_db = parse_metadata_db_info(conn_str)?;

// metadata_db is required for eventhub group
if metadata_db.is_none() {
anyhow::bail!("metadata_db is required for eventhub group");
}

// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
Expand Down Expand Up @@ -823,10 +816,11 @@ fn parse_db_options(
Ok(config)
}

fn parse_metadata_db_info(conn_str: &str) -> anyhow::Result<Option<PostgresConfig>> {
if conn_str.is_empty() {
return Ok(None);
}
fn parse_metadata_db_info(conn_str_opt: Option<String>) -> anyhow::Result<Option<PostgresConfig>> {
let conn_str = match conn_str_opt {
Some(conn_str) => conn_str,
None => return Ok(None),
};

let mut metadata_db = PostgresConfig::default();
let param_pairs: Vec<&str> = conn_str.split_whitespace().collect();
Expand Down
6 changes: 3 additions & 3 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ const QREP_OPTIONS: &[QRepOptionType] = &[
QRepOptionType::Boolean {
name: "setup_watermark_table_on_destination",
default_value: false,
required: false
required: false,
},
QRepOptionType::Boolean {
name: "dst_table_full_resync",
default_value: false,
required: false
}
required: false,
},
];

pub fn process_options(
Expand Down
3 changes: 2 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts: HashMap<String, Value> = row.get::<&str, Option<Value>>("flow_metadata")
let flow_opts: HashMap<String, Value> = row
.get::<&str, Option<Value>>("flow_metadata")
.and_then(|flow_opts| serde_json::from_value(flow_opts).ok())
.unwrap_or_default();

Expand Down
1 change: 0 additions & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

use catalog::WorkflowDetails;
use pt::{
flow_model::{FlowJob, QRepFlowJob},
Expand Down
89 changes: 45 additions & 44 deletions nexus/peer-bigquery/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::ops::ControlFlow;
use sqlparser::ast::Value::Number;

use sqlparser::ast::{
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut,
Array, ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr,
Function, FunctionArg, FunctionArgExpr, Ident,
ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array,
ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo,
};

#[derive(Default)]
Expand Down Expand Up @@ -99,11 +98,7 @@ impl BigqueryAst {

visit_expressions_mut(query, |node| {
// CAST AS Text to CAST AS String
if let Expr::Cast {
data_type: dt,
..
} = node
{
if let Expr::Cast { data_type: dt, .. } = node {
if let DataType::Text = dt {
*dt = DataType::String(None);
}
Expand Down Expand Up @@ -177,7 +172,6 @@ impl BigqueryAst {
distinct: false,
special: false,
order_by: vec![],

})
} else if let BinaryOperator::Plus = op {
*node = Expr::Function(Function {
Expand Down Expand Up @@ -241,7 +235,12 @@ impl BigqueryAst {

// flatten ANY to IN operation overall.
visit_expressions_mut(query, |node| {
if let Expr::AnyOp { left, compare_op, right } = node {
if let Expr::AnyOp {
left,
compare_op,
right,
} = node
{
if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) {
let list = self
.flatten_expr_to_in_list(right)
Expand All @@ -257,8 +256,6 @@ impl BigqueryAst {
ControlFlow::<()>::Continue(())
});



Ok(())
}

Expand Down Expand Up @@ -300,7 +297,10 @@ impl BigqueryAst {
fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result<Vec<Expr>> {
let mut list = vec![];
// check if expr is of type Cast
if let Expr::Cast { expr, data_type, .. } = expr {
if let Expr::Cast {
expr, data_type, ..
} = expr
{
// assert that expr is of type SingleQuotedString
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() {
// trim the starting and ending curly braces
Expand All @@ -309,39 +309,40 @@ impl BigqueryAst {
let split = s.split(',');
// match on data type, and create a vector of Expr::Value
match data_type {
DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) |
DataType::Array(ArrayElemTypeDef::SquareBracket(inner))
=> match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString(
s.to_string(),
)));
DataType::Array(ArrayElemTypeDef::AngleBracket(inner))
| DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => {
match inner.as_ref() {
DataType::Text | DataType::Char(_) | DataType::Varchar(_) => {
for s in split {
list.push(Expr::Value(
sqlparser::ast::Value::SingleQuotedString(s.to_string()),
));
}
}
}
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
DataType::Integer(_)
| DataType::Float(_)
| DataType::BigInt(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedTinyInt(_)
| DataType::TinyInt(_)
| DataType::UnsignedInt(_) => {
for s in split {
list.push(Expr::Value(sqlparser::ast::Value::Number(
s.to_string(),
false,
)));
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported inner data type for IN list: {:?}",
data_type
))
}
},
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported data type for IN list: {:?}",
Expand Down
10 changes: 8 additions & 2 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ impl BqSchema {
.schema
.as_ref()
.expect("Schema is not present");
let fields = bq_schema.fields.as_ref().expect("Schema fields are not present");
let fields = bq_schema
.fields
.as_ref()
.expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
Expand All @@ -76,7 +79,10 @@ impl BqSchema {
.collect(),
});

Self { schema, fields: fields.clone() }
Self {
schema,
fields: fields.clone(),
}
}

pub fn schema(&self) -> SchemaRef {
Expand Down

0 comments on commit f7ff9cd

Please sign in to comment.