Skip to content

Commit

Permalink
pgwire 0.19 (#1081)
Browse files Browse the repository at this point in the history
Changes necessary because of
1. sunng87/pgwire#144
2. sunng87/pgwire#147

Tests were failing due to hanging in 0.19.0,
0.19.1 fixed hang: sunng87/pgwire#148
  • Loading branch information
serprex authored Jan 15, 2024
1 parent 42f84e9 commit 1b97624
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 68 deletions.
18 changes: 2 additions & 16 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ members = [
]

resolver = "2"

[workspace.dependencies]
pgwire = "0.19"
2 changes: 1 addition & 1 deletion nexus/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1"
async-trait = "0.1"
catalog = { path = "../catalog" }
futures = { version = "0.3.28", features = ["executor"] }
pgwire = "0.18"
pgwire.workspace = true
pt = { path = "../pt" }
rand = "0.8"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dashmap = "5.0"
futures = { version = "0.3.28", features = ["executor"] }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgwire = "0.18"
pgwire.workspace = true
pt = { path = "../pt" }
rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
pgwire = "0.18"
pgwire.workspace = true
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
tokio = { version = "1.0", features = ["full"] }
value = { path = "../value" }
2 changes: 1 addition & 1 deletion nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgwire = "0.18"
pgwire.workspace = true
postgres-connection = { path = "../postgres-connection" }
pt = { path = "../pt" }
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async-trait = "0.1.57"
jsonwebtoken = { version = "9.0", features = ["use_pem"] }
base64 = "0.21"
dashmap = "5.0"
pgwire = "0.18"
pgwire.workspace = true
sha2 = "0.10"
pt = { path = "../pt" }
rsa = { version = "0.9.2", features = ["pem", "pkcs5"] }
Expand Down
2 changes: 1 addition & 1 deletion nexus/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ peer-cursor = { path = "../peer-cursor" }
peer-postgres = { path = "../peer-postgres" }
peer-snowflake = { path = "../peer-snowflake" }
peerdb-parser = { path = "../parser" }
pgwire = "0.18"
pgwire.workspace = true
prost = "0.12"
pt = { path = "../pt" }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", features = ["visitor"] }
Expand Down
73 changes: 28 additions & 45 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl AuthSource for FixedPasswordAuthSource {
let salt = rand::thread_rng().gen::<[u8; 4]>();
let password = &self.password;
let hash_password = hash_md5_password(
login_info.user().map(|s| s.as_str()).unwrap_or(""),
login_info.user().unwrap_or(""),
password,
&salt,
);
Expand Down Expand Up @@ -117,7 +117,7 @@ impl NexusBackend {
let res = executor.execute(stmt).await?;
match res {
QueryOutput::AffectedRows(rows) => Ok(vec![Response::Execution(
Tag::new_for_execution("OK", Some(rows)),
Tag::new("OK").with_rows(rows),
)]),
QueryOutput::Stream(rows) => {
let schema = rows.schema();
Expand All @@ -134,18 +134,16 @@ impl NexusBackend {
match cm {
peer_cursor::CursorModification::Created(cursor_name) => {
peer_cursors.add_cursor(cursor_name, peer_holder.unwrap());
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
"DECLARE CURSOR",
None,
))])
}
peer_cursor::CursorModification::Closed(cursors) => {
for cursor_name in cursors {
peer_cursors.remove_cursor(&cursor_name);
}
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
"CLOSE CURSOR",
None,
))])
}
}
Expand Down Expand Up @@ -189,9 +187,8 @@ impl NexusBackend {
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -291,15 +288,13 @@ impl NexusBackend {
)
})?;
let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&drop_mirror_success,
None,
))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -352,17 +347,15 @@ impl NexusBackend {
if qrep_flow_job.disabled {
let create_mirror_success =
format!("CREATE MIRROR {}", qrep_flow_job.name);
return Ok(vec![Response::Execution(Tag::new_for_execution(
return Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
None,
))]);
}

let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?;
let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, &qrep_flow_job.name)
Expand Down Expand Up @@ -403,8 +396,8 @@ impl NexusBackend {
e.to_string(),
)))
})?;
Ok(vec![Response::Execution(Tag::new_for_execution(
"OK", None,
Ok(vec![Response::Execution(Tag::new(
"OK",
))])
}
PeerDDL::CreateMirrorForCDC {
Expand Down Expand Up @@ -484,9 +477,8 @@ impl NexusBackend {
})?;

let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
None,
))])
} else {
Self::handle_mirror_existence(*if_not_exists, &flow_job.name)
Expand Down Expand Up @@ -514,9 +506,8 @@ impl NexusBackend {
} {
let workflow_id = self.run_qrep_mirror(&job).await?;
let create_mirror_success = format!("STARTED WORKFLOW {}", workflow_id);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&create_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -559,15 +550,13 @@ impl NexusBackend {
PgWireError::ApiError(format!("unable to drop peer: {:?}", err).into())
})?;
let drop_peer_success = format!("DROP PEER {}", peer_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&drop_peer_success,
None,
))])
} else if *if_exists {
let no_peer_success = "NO SUCH PEER";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
no_peer_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -649,16 +638,14 @@ impl NexusBackend {
})?;

let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&resync_mirror_success,
None,
))])
}
None => {
let no_peer_success = "NO SUCH QREP MIRROR";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
no_peer_success,
None,
))])
}
}
Expand Down Expand Up @@ -708,15 +695,13 @@ impl NexusBackend {
)
})?;
let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&drop_mirror_success,
None,
))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -771,15 +756,13 @@ impl NexusBackend {
)
})?;
let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
&drop_mirror_success,
None,
))])
} else if *if_exists {
let no_mirror_success = "NO SUCH MIRROR";
Ok(vec![Response::Execution(Tag::new_for_execution(
Ok(vec![Response::Execution(Tag::new(
no_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down Expand Up @@ -934,7 +917,7 @@ impl SimpleQueryHandler for NexusBackend {
fn parameter_to_string(portal: &Portal<NexusParsedStatement>, idx: usize) -> PgWireResult<String> {
// the index is managed from portal's parameters count so it's safe to
// unwrap here.
let param_type = portal.statement().parameter_types().get(idx).unwrap();
let param_type = portal.statement.parameter_types.get(idx).unwrap();
match param_type {
&Type::VARCHAR | &Type::TEXT => Ok(format!(
"'{}'",
Expand Down Expand Up @@ -990,7 +973,7 @@ impl ExtendedQueryHandler for NexusBackend {
where
C: ClientInfo + Unpin + Send + Sync,
{
let stmt = portal.statement().statement();
let stmt = &portal.statement.statement;
tracing::info!("[eqp] do_query: {}", stmt.query);

// manually replace variables in prepared statement
Expand Down Expand Up @@ -1019,13 +1002,13 @@ impl ExtendedQueryHandler for NexusBackend {
{
let (param_types, stmt, _format) = match target {
StatementOrPortal::Statement(stmt) => {
let param_types = Some(stmt.parameter_types().clone());
(param_types, stmt.statement(), &Format::UnifiedBinary)
let param_types = Some(&stmt.parameter_types);
(param_types, &stmt.statement, &Format::UnifiedBinary)
}
StatementOrPortal::Portal(portal) => (
None,
portal.statement().statement(),
portal.result_column_format(),
&portal.statement.statement,
&portal.result_column_format,
),
};

Expand Down Expand Up @@ -1085,7 +1068,7 @@ impl ExtendedQueryHandler for NexusBackend {
Ok(DescribeResponse::no_data())
} else {
Ok(DescribeResponse::new(
param_types,
param_types.cloned(),
(*described_schema).clone(),
))
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde_json = "1.0"
postgres-inet = "0.19.0"
chrono = { version = "0.4", features = ["serde"] }
hex = "0.4"
pgwire = "0.18"
pgwire.workspace = true
postgres = { version = "0.19", features = ["with-chrono-0_4"] }
postgres-types = { version = "0.2.5", features = ["array-impls"] }
uuid = { version = "1.0", features = ["serde", "v4"] }

0 comments on commit 1b97624

Please sign in to comment.