From 820759bfa833783ac63104988adfc3b2f2ea4c92 Mon Sep 17 00:00:00 2001 From: Solomon <108011288+abcpro1@users.noreply.github.com> Date: Fri, 15 Sep 2023 15:01:57 +0000 Subject: [PATCH] fix: snowflake bug fixes (#2040) * fix: handle network error in snowflake Handle a network error that was discovered in snowflake. * fix: return good connection to the connection pool Connections have to returned manually because the connection is just an opaque handle which doesn't know if it had errored. * fix: increment the cursor position Once a row is delivered, we should increment the offset. * fix: break out of an infinite loop --- Cargo.lock | 5 +- dozer-ingestion/Cargo.toml | 3 +- .../connectors/snowflake/connection/client.rs | 55 +++++++++++-------- .../snowflake/connection/helpers.rs | 4 +- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd56a6e850..27236e2d00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2706,6 +2706,7 @@ dependencies = [ "hex", "hex-literal", "include_dir", + "memchr", "mongodb", "mysql_async", "mysql_common", @@ -4755,9 +4756,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 1f4f6f8760..e71776993a 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -68,6 +68,7 @@ geozero = { version = "0.10.0", default-features = false, features = [ ] } bytes = "1.4.0" genawaiter = { version = "0.99.1", optional = true } +memchr = { version = "2.6.3", optional = true } [dev-dependencies] criterion = { version = "0.4.0", features = ["html_reports"] } @@ -83,7 +84,7 @@ dozer-cli = { path = "../dozer-cli" } [features] # Defines a feature named `odbc` that does not enable any other features. -snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter"] +snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter", "dep:memchr"] ethereum = ["dep:web3"] kafka = ["dep:rdkafka", "dep:schema_registry_converter"] mongodb = ["dep:mongodb"] diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index b51e79676b..dfc6538557 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -458,7 +458,10 @@ fn get_fields_from_cursor( fn exec_drop(pool: &Pool, query: &str) -> Result<(), Box> { let conn = pool.get_conn()?; - let _result = exec_helper(&conn, query)?; + { + let _result = exec_helper(&conn, query)?; + } + conn.return_(); Ok(()) } @@ -469,6 +472,7 @@ fn exec_first_exists(pool: &Pool, query: &str) -> Result retry!(data.fetch())?.is_some(), None => false, }; + conn.return_(); break Ok(result); } } @@ -483,34 +487,39 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { let schema_ref = schema.clone(); let mut generator: Gen, (), _> = gen!({ - let cursor_position = 0u64; + let mut cursor_position = 0u64; 'retry: loop { let conn = pool.get_conn().map_err(QueryError)?; - let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?) - .map_err(QueryError)? { - Some(data) => data, - None => break, - }; - let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?; - let mut vec = Vec::new(); - for i in 1..(cols + 1) { - let value = i.try_into(); - let column_descriptor = match value { - Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?, - Err(e) => Err(SchemaConversionError(e))?, + let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?) + .map_err(QueryError)? + { + Some(data) => data, + None => break, }; - vec.push(column_descriptor) - } - schema.borrow_mut().replace(vec); + let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?; + let mut vec = Vec::new(); + for i in 1..(cols + 1) { + let value = i.try_into(); + let column_descriptor = match value { + Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?, + Err(e) => Err(SchemaConversionError(e))?, + }; + vec.push(column_descriptor) + } + schema.borrow_mut().replace(vec); - while let Some(cursor) = - retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))? - { - let fields = - get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; - yield_!(fields); + while let Some(cursor) = + retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))? + { + let fields = + get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; + yield_!(fields); + cursor_position += 1; + } } + conn.return_(); + break; } Ok::<(), SnowflakeError>(()) }); diff --git a/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs b/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs index 5b971ad39e..3cf74ec66d 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs @@ -7,6 +7,8 @@ pub fn is_network_failure(err: &odbc::DiagnosticRecord) -> bool { b"01002" /* Disconnect error */ | b"08001" /* Client unable to establish connection */ | b"08007" /* Connection failure during transaction */ | - b"08S01" /* Communication link failure */ + b"08S01" /* Communication link failure */ | + b"HY000" /* General error */ + if memchr::memmem::find(err.get_raw_message(), b"Timeout was reached").is_some() ) }