From 59be7dea6a9312e84515fe6065fdae57f2d746ce Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Fri, 15 Sep 2023 12:13:39 +0000 Subject: [PATCH 1/4] fix: handle network error in snowflake Handle a network error that was discovered in snowflake. --- Cargo.lock | 5 +++-- dozer-ingestion/Cargo.toml | 3 ++- .../src/connectors/snowflake/connection/helpers.rs | 4 +++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd56a6e850..27236e2d00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2706,6 +2706,7 @@ dependencies = [ "hex", "hex-literal", "include_dir", + "memchr", "mongodb", "mysql_async", "mysql_common", @@ -4755,9 +4756,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" diff --git a/dozer-ingestion/Cargo.toml b/dozer-ingestion/Cargo.toml index 1f4f6f8760..e71776993a 100644 --- a/dozer-ingestion/Cargo.toml +++ b/dozer-ingestion/Cargo.toml @@ -68,6 +68,7 @@ geozero = { version = "0.10.0", default-features = false, features = [ ] } bytes = "1.4.0" genawaiter = { version = "0.99.1", optional = true } +memchr = { version = "2.6.3", optional = true } [dev-dependencies] criterion = { version = "0.4.0", features = ["html_reports"] } @@ -83,7 +84,7 @@ dozer-cli = { path = "../dozer-cli" } [features] # Defines a feature named `odbc` that does not enable any other features. -snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter"] +snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter", "dep:memchr"] ethereum = ["dep:web3"] kafka = ["dep:rdkafka", "dep:schema_registry_converter"] mongodb = ["dep:mongodb"] diff --git a/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs b/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs index 5b971ad39e..3cf74ec66d 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/helpers.rs @@ -7,6 +7,8 @@ pub fn is_network_failure(err: &odbc::DiagnosticRecord) -> bool { b"01002" /* Disconnect error */ | b"08001" /* Client unable to establish connection */ | b"08007" /* Connection failure during transaction */ | - b"08S01" /* Communication link failure */ + b"08S01" /* Communication link failure */ | + b"HY000" /* General error */ + if memchr::memmem::find(err.get_raw_message(), b"Timeout was reached").is_some() ) } From b896658078b76b44aa6a25b8e727ac549a30cbc0 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Fri, 15 Sep 2023 12:15:22 +0000 Subject: [PATCH 2/4] fix: return good connection to the connection pool Connections have to returned manually because the connection is just an opaque handle which doesn't know if it had errored. --- .../connectors/snowflake/connection/client.rs | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index b51e79676b..e9ebb53aba 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -458,7 +458,10 @@ fn get_fields_from_cursor( fn exec_drop(pool: &Pool, query: &str) -> Result<(), Box> { let conn = pool.get_conn()?; - let _result = exec_helper(&conn, query)?; + { + let _result = exec_helper(&conn, query)?; + } + conn.return_(); Ok(()) } @@ -469,6 +472,7 @@ fn exec_first_exists(pool: &Pool, query: &str) -> Result retry!(data.fetch())?.is_some(), None => false, }; + conn.return_(); break Ok(result); } } @@ -486,31 +490,34 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { let cursor_position = 0u64; 'retry: loop { let conn = pool.get_conn().map_err(QueryError)?; - let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?) - .map_err(QueryError)? { - Some(data) => data, - None => break, - }; - let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?; - let mut vec = Vec::new(); - for i in 1..(cols + 1) { - let value = i.try_into(); - let column_descriptor = match value { - Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?, - Err(e) => Err(SchemaConversionError(e))?, + let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?) + .map_err(QueryError)? + { + Some(data) => data, + None => break, }; - vec.push(column_descriptor) - } - schema.borrow_mut().replace(vec); + let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?; + let mut vec = Vec::new(); + for i in 1..(cols + 1) { + let value = i.try_into(); + let column_descriptor = match value { + Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?, + Err(e) => Err(SchemaConversionError(e))?, + }; + vec.push(column_descriptor) + } + schema.borrow_mut().replace(vec); - while let Some(cursor) = - retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))? - { - let fields = - get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; - yield_!(fields); + while let Some(cursor) = + retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))? + { + let fields = + get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; + yield_!(fields); + } } + conn.return_(); } Ok::<(), SnowflakeError>(()) }); From 73ab5c4776307194d811ad87b35edf3462bf0394 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Fri, 15 Sep 2023 12:17:02 +0000 Subject: [PATCH 3/4] fix: increment the cursor position Once a row is delivered, we should increment the offset. --- dozer-ingestion/src/connectors/snowflake/connection/client.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index e9ebb53aba..b2837788b6 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -487,7 +487,7 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { let schema_ref = schema.clone(); let mut generator: Gen, (), _> = gen!({ - let cursor_position = 0u64; + let mut cursor_position = 0u64; 'retry: loop { let conn = pool.get_conn().map_err(QueryError)?; { @@ -515,6 +515,7 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { let fields = get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?; yield_!(fields); + cursor_position += 1; } } conn.return_(); From 4a5dd5b74d5aedee84d7119d30f4f5b77537b640 Mon Sep 17 00:00:00 2001 From: abcpro1 Date: Fri, 15 Sep 2023 12:17:59 +0000 Subject: [PATCH 4/4] fix: break out of an infinite loop --- dozer-ingestion/src/connectors/snowflake/connection/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index b2837788b6..dfc6538557 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -519,6 +519,7 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter { } } conn.return_(); + break; } Ok::<(), SnowflakeError>(()) });