Skip to content

Commit

Permalink
fix: snowflake bug fixes (#2040)
Browse files Browse the repository at this point in the history
* fix: handle network error in snowflake

Handle a network error that was discovered in snowflake.

* fix: return good connection to the connection pool

Connections have to returned manually because the connection is just an opaque handle which doesn't know if it had errored.

* fix: increment the cursor position

Once a row is delivered, we should increment the offset.

* fix: break out of an infinite loop
  • Loading branch information
abcpro1 authored Sep 15, 2023
1 parent 1a2cdd4 commit 820759b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 27 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion dozer-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ geozero = { version = "0.10.0", default-features = false, features = [
] }
bytes = "1.4.0"
genawaiter = { version = "0.99.1", optional = true }
memchr = { version = "2.6.3", optional = true }

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
Expand All @@ -83,7 +84,7 @@ dozer-cli = { path = "../dozer-cli" }

[features]
# Defines a feature named `odbc` that does not enable any other features.
snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter"]
snowflake = ["dep:odbc", "dep:include_dir", "dep:genawaiter", "dep:memchr"]
ethereum = ["dep:web3"]
kafka = ["dep:rdkafka", "dep:schema_registry_converter"]
mongodb = ["dep:mongodb"]
Expand Down
55 changes: 32 additions & 23 deletions dozer-ingestion/src/connectors/snowflake/connection/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ fn get_fields_from_cursor(

fn exec_drop(pool: &Pool, query: &str) -> Result<(), Box<DiagnosticRecord>> {
let conn = pool.get_conn()?;
let _result = exec_helper(&conn, query)?;
{
let _result = exec_helper(&conn, query)?;
}
conn.return_();
Ok(())
}

Expand All @@ -469,6 +472,7 @@ fn exec_first_exists(pool: &Pool, query: &str) -> Result<bool, Box<DiagnosticRec
Some(mut data) => retry!(data.fetch())?.is_some(),
None => false,
};
conn.return_();
break Ok(result);
}
}
Expand All @@ -483,34 +487,39 @@ fn exec_iter(pool: Pool, query: String) -> ExecIter {
let schema_ref = schema.clone();

let mut generator: Gen<Vec<Field>, (), _> = gen!({
let cursor_position = 0u64;
let mut cursor_position = 0u64;
'retry: loop {
let conn = pool.get_conn().map_err(QueryError)?;
let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?)
.map_err(QueryError)?
{
Some(data) => data,
None => break,
};
let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?;
let mut vec = Vec::new();
for i in 1..(cols + 1) {
let value = i.try_into();
let column_descriptor = match value {
Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?,
Err(e) => Err(SchemaConversionError(e))?,
let mut data = match exec_helper(&conn, &add_query_offset(&query, cursor_position)?)
.map_err(QueryError)?
{
Some(data) => data,
None => break,
};
vec.push(column_descriptor)
}
schema.borrow_mut().replace(vec);
let cols = data.num_result_cols().map_err(|e| QueryError(e.into()))?;
let mut vec = Vec::new();
for i in 1..(cols + 1) {
let value = i.try_into();
let column_descriptor = match value {
Ok(v) => data.describe_col(v).map_err(|e| QueryError(e.into()))?,
Err(e) => Err(SchemaConversionError(e))?,
};
vec.push(column_descriptor)
}
schema.borrow_mut().replace(vec);

while let Some(cursor) =
retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))?
{
let fields =
get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?;
yield_!(fields);
while let Some(cursor) =
retry!(data.fetch(),'retry).map_err(|e| QueryError(e.into()))?
{
let fields =
get_fields_from_cursor(cursor, cols, schema.borrow().as_deref().unwrap())?;
yield_!(fields);
cursor_position += 1;
}
}
conn.return_();
break;
}
Ok::<(), SnowflakeError>(())
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub fn is_network_failure(err: &odbc::DiagnosticRecord) -> bool {
b"01002" /* Disconnect error */ |
b"08001" /* Client unable to establish connection */ |
b"08007" /* Connection failure during transaction */ |
b"08S01" /* Communication link failure */
b"08S01" /* Communication link failure */ |
b"HY000" /* General error */
if memchr::memmem::find(err.get_raw_message(), b"Timeout was reached").is_some()
)
}

0 comments on commit 820759b

Please sign in to comment.