From 1b97624e908ab0e160d58ecad8cfa269b8253bb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 15 Jan 2024 15:22:44 +0000 Subject: [PATCH] pgwire 0.19 (#1081) Changes necessary because of 1. https://github.com/sunng87/pgwire/pull/144 2. https://github.com/sunng87/pgwire/pull/147 Tests were failing due to hanging in 0.19.0, 0.19.1 fixed hang: https://github.com/sunng87/pgwire/pull/148 --- nexus/Cargo.lock | 18 +------- nexus/Cargo.toml | 3 ++ nexus/parser/Cargo.toml | 2 +- nexus/peer-bigquery/Cargo.toml | 2 +- nexus/peer-cursor/Cargo.toml | 2 +- nexus/peer-postgres/Cargo.toml | 2 +- nexus/peer-snowflake/Cargo.toml | 2 +- nexus/server/Cargo.toml | 2 +- nexus/server/src/main.rs | 73 +++++++++++++-------------------- nexus/value/Cargo.toml | 2 +- 10 files changed, 40 insertions(+), 68 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index d541062eab..76629b157c 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -1063,18 +1063,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "getset" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "gimli" version = "0.28.1" @@ -1953,9 +1941,9 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.18.0" +version = "0.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b277432819ee6b76bf56de5e91eae578d6b332bd6f05f963ee81fc788bc886f" +checksum = "17780c93587822c191c3f4d43fa5f6bc6df1e51b9f58a0be0cd1b7fd6e80d9e6" dependencies = [ "async-trait", "base64", @@ -1963,7 +1951,6 @@ dependencies = [ "chrono", "derive-new", "futures", - "getset", "hex", "log", "md5", @@ -2198,7 +2185,6 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.109", "version_check", ] diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 2aaa406ac5..10a697d065 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -16,3 +16,6 @@ members = [ ] resolver = "2" + +[workspace.dependencies] +pgwire = "0.19" diff --git a/nexus/parser/Cargo.toml b/nexus/parser/Cargo.toml index 0b1eae1678..7cb79e5820 100644 --- a/nexus/parser/Cargo.toml +++ b/nexus/parser/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1" async-trait = "0.1" catalog = { path = "../catalog" } futures = { version = "0.3.28", features = ["executor"] } -pgwire = "0.18" +pgwire.workspace = true pt = { path = "../pt" } rand = "0.8" sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index a7b570f9d3..5e48c3a1ac 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -13,7 +13,7 @@ dashmap = "5.0" futures = { version = "0.3.28", features = ["executor"] } peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } -pgwire = "0.18" +pgwire.workspace = true pt = { path = "../pt" } rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } serde = { version = "1.0", features = ["derive"] } diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index 7623f8f007..5ce558e055 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" anyhow = "1.0" async-trait = "0.1" futures = "0.3" -pgwire = "0.18" +pgwire.workspace = true sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } tokio = { version = "1.0", features = ["full"] } value = { path = "../value" } diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index 9cc37b8fd5..f1ae2c7aac 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -14,7 +14,7 @@ chrono = { version = "0.4", features = ["serde"] } futures = "0.3" peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } -pgwire = "0.18" +pgwire.workspace = true postgres-connection = { path = "../postgres-connection" } pt = { path = "../pt" } serde = { version = "1.0", features = ["derive"] } diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index 7017ced427..cbebba6427 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -16,7 +16,7 @@ async-trait = "0.1.57" jsonwebtoken = { version = "9.0", features = ["use_pem"] } base64 = "0.21" dashmap = "5.0" -pgwire = "0.18" +pgwire.workspace = true sha2 = "0.10" pt = { path = "../pt" } rsa = { version = "0.9.2", features = ["pem", "pkcs5"] } diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index 33d6d14920..9915852316 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -47,7 +47,7 @@ peer-cursor = { path = "../peer-cursor" } peer-postgres = { path = "../peer-postgres" } peer-snowflake = { path = "../peer-snowflake" } peerdb-parser = { path = "../parser" } -pgwire = "0.18" +pgwire.workspace = true prost = "0.12" pt = { path = "../pt" } sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", features = ["visitor"] } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 103e2b0537..4a1eebe7e1 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -67,7 +67,7 @@ impl AuthSource for FixedPasswordAuthSource { let salt = rand::thread_rng().gen::<[u8; 4]>(); let password = &self.password; let hash_password = hash_md5_password( - login_info.user().map(|s| s.as_str()).unwrap_or(""), + login_info.user().unwrap_or(""), password, &salt, ); @@ -117,7 +117,7 @@ impl NexusBackend { let res = executor.execute(stmt).await?; match res { QueryOutput::AffectedRows(rows) => Ok(vec![Response::Execution( - Tag::new_for_execution("OK", Some(rows)), + Tag::new("OK").with_rows(rows), )]), QueryOutput::Stream(rows) => { let schema = rows.schema(); @@ -134,18 +134,16 @@ impl NexusBackend { match cm { peer_cursor::CursorModification::Created(cursor_name) => { peer_cursors.add_cursor(cursor_name, peer_holder.unwrap()); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( "DECLARE CURSOR", - None, ))]) } peer_cursor::CursorModification::Closed(cursors) => { for cursor_name in cursors { peer_cursors.remove_cursor(&cursor_name); } - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( "CLOSE CURSOR", - None, ))]) } } @@ -189,9 +187,8 @@ impl NexusBackend { ) -> PgWireResult>> { if if_not_exists { let existing_mirror_success = "MIRROR ALREADY EXISTS"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( existing_mirror_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -291,15 +288,13 @@ impl NexusBackend { ) })?; let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &drop_mirror_success, - None, ))]) } else if *if_exists { let no_mirror_success = "NO SUCH MIRROR"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( no_mirror_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -352,17 +347,15 @@ impl NexusBackend { if qrep_flow_job.disabled { let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - return Ok(vec![Response::Execution(Tag::new_for_execution( + return Ok(vec![Response::Execution(Tag::new( &create_mirror_success, - None, ))]); } let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &create_mirror_success, - None, ))]) } else { Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name) @@ -403,8 +396,8 @@ impl NexusBackend { e.to_string(), ))) })?; - Ok(vec![Response::Execution(Tag::new_for_execution( - "OK", None, + Ok(vec![Response::Execution(Tag::new( + "OK", ))]) } PeerDDL::CreateMirrorForCDC { @@ -484,9 +477,8 @@ impl NexusBackend { })?; let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &create_mirror_success, - None, ))]) } else { Self::handle_mirror_existence(*if_not_exists, &flow_job.name) @@ -514,9 +506,8 @@ impl NexusBackend { } { let workflow_id = self.run_qrep_mirror(&job).await?; let create_mirror_success = format!("STARTED WORKFLOW {}", workflow_id); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &create_mirror_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -559,15 +550,13 @@ impl NexusBackend { PgWireError::ApiError(format!("unable to drop peer: {:?}", err).into()) })?; let drop_peer_success = format!("DROP PEER {}", peer_name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &drop_peer_success, - None, ))]) } else if *if_exists { let no_peer_success = "NO SUCH PEER"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( no_peer_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -649,16 +638,14 @@ impl NexusBackend { })?; let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &resync_mirror_success, - None, ))]) } None => { let no_peer_success = "NO SUCH QREP MIRROR"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( no_peer_success, - None, ))]) } } @@ -708,15 +695,13 @@ impl NexusBackend { ) })?; let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &drop_mirror_success, - None, ))]) } else if *if_exists { let no_mirror_success = "NO SUCH MIRROR"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( no_mirror_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -771,15 +756,13 @@ impl NexusBackend { ) })?; let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( &drop_mirror_success, - None, ))]) } else if *if_exists { let no_mirror_success = "NO SUCH MIRROR"; - Ok(vec![Response::Execution(Tag::new_for_execution( + Ok(vec![Response::Execution(Tag::new( no_mirror_success, - None, ))]) } else { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -934,7 +917,7 @@ impl SimpleQueryHandler for NexusBackend { fn parameter_to_string(portal: &Portal, idx: usize) -> PgWireResult { // the index is managed from portal's parameters count so it's safe to // unwrap here. - let param_type = portal.statement().parameter_types().get(idx).unwrap(); + let param_type = portal.statement.parameter_types.get(idx).unwrap(); match param_type { &Type::VARCHAR | &Type::TEXT => Ok(format!( "'{}'", @@ -990,7 +973,7 @@ impl ExtendedQueryHandler for NexusBackend { where C: ClientInfo + Unpin + Send + Sync, { - let stmt = portal.statement().statement(); + let stmt = &portal.statement.statement; tracing::info!("[eqp] do_query: {}", stmt.query); // manually replace variables in prepared statement @@ -1019,13 +1002,13 @@ impl ExtendedQueryHandler for NexusBackend { { let (param_types, stmt, _format) = match target { StatementOrPortal::Statement(stmt) => { - let param_types = Some(stmt.parameter_types().clone()); - (param_types, stmt.statement(), &Format::UnifiedBinary) + let param_types = Some(&stmt.parameter_types); + (param_types, &stmt.statement, &Format::UnifiedBinary) } StatementOrPortal::Portal(portal) => ( None, - portal.statement().statement(), - portal.result_column_format(), + &portal.statement.statement, + &portal.result_column_format, ), }; @@ -1085,7 +1068,7 @@ impl ExtendedQueryHandler for NexusBackend { Ok(DescribeResponse::no_data()) } else { Ok(DescribeResponse::new( - param_types, + param_types.cloned(), (*described_schema).clone(), )) } diff --git a/nexus/value/Cargo.toml b/nexus/value/Cargo.toml index d4fdf39cbf..c3a8078e14 100644 --- a/nexus/value/Cargo.toml +++ b/nexus/value/Cargo.toml @@ -14,7 +14,7 @@ serde_json = "1.0" postgres-inet = "0.19.0" chrono = { version = "0.4", features = ["serde"] } hex = "0.4" -pgwire = "0.18" +pgwire.workspace = true postgres = { version = "0.19", features = ["with-chrono-0_4"] } postgres-types = { version = "0.2.5", features = ["array-impls"] } uuid = { version = "1.0", features = ["serde", "v4"] }