diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 59e1bbf8b9..09159e4d29 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -781,12 +781,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "doc-comment" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" - [[package]] name = "dotenvy" version = "0.15.7" @@ -1792,7 +1786,6 @@ dependencies = [ "gcp-bigquery-client", "peer-connections", "peer-cursor", - "pgerror", "pgwire", "pt", "rust_decimal", @@ -1827,7 +1820,6 @@ dependencies = [ "anyhow", "async-trait", "futures", - "pgerror", "pgwire", "sqlparser", "tokio", @@ -1845,7 +1837,6 @@ dependencies = [ "futures", "peer-connections", "peer-cursor", - "pgerror", "pgwire", "postgres-connection", "postgres-inet", @@ -1877,7 +1868,6 @@ dependencies = [ "hex", "jsonwebtoken", "peer-cursor", - "pgerror", "pgwire", "pt", "reqwest", @@ -1932,7 +1922,6 @@ dependencies = [ "peer-postgres", "peer-snowflake", "peerdb-parser", - "pgerror", "pgwire", "postgres", "prost", @@ -1984,13 +1973,6 @@ dependencies = [ "indexmap 2.1.0", ] -[[package]] -name = "pgerror" -version = "0.1.0" -dependencies = [ - "snafu", -] - [[package]] name = "pgwire" version = "0.18.0" @@ -3015,29 +2997,6 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" -[[package]] -name = "snafu" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" -dependencies = [ - "backtrace", - "doc-comment", - "snafu-derive", -] - -[[package]] -name = "snafu-derive" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "socket2" version = "0.5.5" diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index a07c40ea46..2aaa406ac5 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -9,7 +9,6 @@ members = [ "peer-cursor", "peer-postgres", "peer-snowflake", - "pgerror", "postgres-connection", "pt", "server", diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index 9f565388fb..a7b570f9d3 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -13,7 +13,6 @@ dashmap = "5.0" futures = { version = "0.3.28", features = ["executor"] } peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } -pgerror = { path = "../pgerror" } pgwire = "0.18" pt = { path = "../pt" } rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index c338518f45..29d58fb24f 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -8,7 +8,6 @@ use gcp_bigquery_client::{ }; use peer_connections::PeerConnectionTracker; use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef}; -use pgerror::PgError; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use pt::peerdb_peers::BigqueryConfig; use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value}; @@ -76,9 +75,7 @@ impl BigQueryQueryExecutor { .await .map_err(|err| { tracing::error!("error tracking query: {}", err); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) + PgWireError::ApiError(err.into()) })?; let result_set = self @@ -88,16 +85,12 @@ impl BigQueryQueryExecutor { .await .map_err(|err| { tracing::error!("error running query: {}", err); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) + PgWireError::ApiError(err.into()) })?; token.end().await.map_err(|err| { tracing::error!("error closing tracking token: {}", err); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) + PgWireError::ApiError(err.into()) })?; Ok(result_set) @@ -116,11 +109,7 @@ impl QueryExecutor for BigQueryQueryExecutor { bq_ast .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) - })?; + .map_err(|err| PgWireError::ApiError(err.into()))?; let query = query.to_string(); tracing::info!("bq rewritten query: {}", query); @@ -170,11 +159,7 @@ impl QueryExecutor for BigQueryQueryExecutor { // If parsing the count resulted in an error, return an internal error let count = match count { Ok(c) => c, - Err(err) => { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - }))) - } + Err(err) => return Err(PgWireError::ApiError(err.into())), }; tracing::info!("fetching {} rows", count); @@ -226,11 +211,7 @@ impl QueryExecutor for BigQueryQueryExecutor { bq_ast .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) - })?; + .map_err(|err| PgWireError::ApiError(err.into()))?; // add LIMIT 0 to the root level query. // this is a workaround for the bigquery API not supporting DESCRIBE diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index 53e85c021a..a831f6818f 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -10,7 +10,6 @@ use gcp_bigquery_client::model::{ field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema, }; use peer_cursor::{Record, RecordStream, Schema, SchemaRef}; -use pgerror::PgError; use pgwire::{ api::{ results::{FieldFormat, FieldInfo}, @@ -181,19 +180,13 @@ impl BqRecordStream { impl Stream for BqRecordStream { type Item = PgWireResult; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.result_set.next_row() { - true => { - let record = this.convert_result_set_item(&this.result_set); - let result = record.map_err(|e| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error getting curent row: {}", e), - })) - }); - Poll::Ready(Some(result)) - } - false => Poll::Ready(None), + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.result_set.next_row() { + let record = self.convert_result_set_item(&self.result_set); + let result = record.map_err(|e| PgWireError::ApiError(e.into())); + Poll::Ready(Some(result)) + } else { + Poll::Ready(None) } } } diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index cc644064b1..7623f8f007 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -9,7 +9,6 @@ edition = "2021" anyhow = "1.0" async-trait = "0.1" futures = "0.3" -pgerror = { path = "../pgerror" } pgwire = "0.18" sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } tokio = { version = "1.0", features = ["full"] } diff --git a/nexus/peer-cursor/src/util.rs b/nexus/peer-cursor/src/util.rs index e87478d67b..e9b9d55b00 100644 --- a/nexus/peer-cursor/src/util.rs +++ b/nexus/peer-cursor/src/util.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use futures::{stream, StreamExt}; -use pgerror::PgError; use pgwire::{ api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response}, error::{PgWireError, PgWireResult}, @@ -48,14 +47,13 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> let s = u.to_string(); builder.encode_field(&s) } - Value::Enum(_) | Value::Hstore(_) => { - Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "cannot write value {:?} in postgres protocol: unimplemented", - &value - ), - }))) - } + Value::Enum(_) | Value::Hstore(_) => Err(PgWireError::ApiError( + format!( + "cannot write value {:?} in postgres protocol: unimplemented", + &value + ) + .into(), + )), } } diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index 8f8ef39ead..9cc37b8fd5 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -14,7 +14,6 @@ chrono = { version = "0.4", features = ["serde"] } futures = "0.3" peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } -pgerror = { path = "../pgerror" } pgwire = "0.18" postgres-connection = { path = "../postgres-connection" } pt = { path = "../pt" } diff --git a/nexus/peer-postgres/src/lib.rs b/nexus/peer-postgres/src/lib.rs index 6617eeefd3..7df2da1914 100644 --- a/nexus/peer-postgres/src/lib.rs +++ b/nexus/peer-postgres/src/lib.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use peer_cursor::{QueryExecutor, QueryOutput, Schema, SchemaRef}; -use pgerror::PgError; use pgwire::{ api::results::{FieldFormat, FieldInfo}, error::{PgWireError, PgWireResult}, @@ -71,9 +70,7 @@ impl QueryExecutor for PostgresQueryExecutor { .await .map_err(|e| { tracing::error!("error getting schema: {}", e); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error getting schema: {}", e), - })) + PgWireError::ApiError(format!("error getting schema: {}", e).into()) })?; tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query); @@ -86,9 +83,7 @@ impl QueryExecutor for PostgresQueryExecutor { .await .map_err(|e| { tracing::error!("error executing query: {}", e); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error executing query: {}", e), - })) + PgWireError::ApiError(format!("error executing query: {}", e).into()) })?; // log that raw query execution has completed @@ -101,9 +96,7 @@ impl QueryExecutor for PostgresQueryExecutor { let mut rewritten_stmt = stmt.clone(); ast.rewrite_statement(&mut rewritten_stmt).map_err(|e| { tracing::error!("error rewriting statement: {}", e); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error rewriting statement: {}", e), - })) + PgWireError::ApiError(format!("error rewriting statement: {}", e).into()) })?; let rewritten_query = rewritten_stmt.to_string(); tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query); @@ -113,9 +106,7 @@ impl QueryExecutor for PostgresQueryExecutor { .await .map_err(|e| { tracing::error!("error executing query: {}", e); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error executing query: {}", e), - })) + PgWireError::ApiError(format!("error executing query: {}", e).into()) })?; Ok(QueryOutput::AffectedRows(rows_affected as usize)) } @@ -130,9 +121,7 @@ impl QueryExecutor for PostgresQueryExecutor { .await .map_err(|e| { tracing::error!("error getting schema: {}", e); - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error getting schema: {}", e), - })) + PgWireError::ApiError(format!("error getting schema: {}", e).into()) })?; Ok(Some(schema)) } diff --git a/nexus/peer-postgres/src/stream.rs b/nexus/peer-postgres/src/stream.rs index e7195a2aeb..21905c1cc6 100644 --- a/nexus/peer-postgres/src/stream.rs +++ b/nexus/peer-postgres/src/stream.rs @@ -2,7 +2,6 @@ use bytes::Bytes; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use futures::Stream; use peer_cursor::{Record, RecordStream, SchemaRef}; -use pgerror::PgError; use pgwire::error::{PgWireError, PgWireResult}; use postgres_inet::MaskedIpAddr; use rust_decimal::Decimal; @@ -268,10 +267,7 @@ impl Stream for PgRecordStream { Poll::Ready(Some(Ok(record))) } Poll::Ready(Some(Err(e))) => { - let err = Box::new(PgError::Internal { - err_msg: e.to_string(), - }); - let err = PgWireError::ApiError(err); + let err = PgWireError::ApiError(Box::new(e)); Poll::Ready(Some(Err(err))) } Poll::Ready(None) => Poll::Ready(None), diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index 913824b31d..7017ced427 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -11,7 +11,6 @@ peer-cursor = { path = "../peer-cursor" } sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" } value = { path = "../value" } tracing = "0.1" -pgerror = { path = "../pgerror" } secrecy = { version = "0.8.0" } async-trait = "0.1.57" jsonwebtoken = { version = "9.0", features = ["use_pem"] } diff --git a/nexus/peer-snowflake/src/auth.rs b/nexus/peer-snowflake/src/auth.rs index 64bb0f0458..4ec1a90823 100644 --- a/nexus/peer-snowflake/src/auth.rs +++ b/nexus/peer-snowflake/src/auth.rs @@ -6,9 +6,9 @@ use std::{ use anyhow::Context; use base64::prelude::{Engine as _, BASE64_STANDARD}; use jsonwebtoken::{encode as jwt_encode, Algorithm, EncodingKey, Header}; -use rsa::RsaPrivateKey; use rsa::pkcs1::EncodeRsaPrivateKey; use rsa::pkcs8::{DecodePrivateKey, EncodePublicKey}; +use rsa::RsaPrivateKey; use secrecy::{Secret, SecretString}; use serde::Serialize; use sha2::{Digest, Sha256}; @@ -101,9 +101,8 @@ impl SnowflakeAuth { #[tracing::instrument(name = "peer_sflake::auth_refresh_jwt", skip_all)] fn refresh_jwt(&mut self) -> anyhow::Result<()> { - let private_key_jwt: EncodingKey = EncodingKey::from_rsa_der( - self.private_key.to_pkcs1_der()?.as_bytes(), - ); + let private_key_jwt: EncodingKey = + EncodingKey::from_rsa_der(self.private_key.to_pkcs1_der()?.as_bytes()); self.last_refreshed = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); info!( "Refreshing SnowFlake JWT for account: {} and user: {} at time {}", diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 9fe3a8536c..ac4d0154d9 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -2,7 +2,6 @@ use anyhow::Context; use async_recursion::async_recursion; use cursor::SnowflakeCursorManager; use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef}; -use pgerror::PgError; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::dialect::GenericDialect; use sqlparser::parser; @@ -209,11 +208,10 @@ impl SnowflakeQueryExecutor { let query_str: String = query.to_string(); info!("Processing SnowFlake query: {}", query_str); - let result_set = self.process_query(&query_str).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) - })?; + let result_set = self + .process_query(&query_str) + .await + .map_err(|err| PgWireError::ApiError(err.into()))?; Ok(result_set) } @@ -309,11 +307,7 @@ impl QueryExecutor for SnowflakeQueryExecutor { snowflake_ast .rewrite(&mut new_query) .context("unable to rewrite query") - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) - })?; + .map_err(|err| PgWireError::ApiError(err.into()))?; let result_set = self.query(&query.clone()).await?; @@ -361,11 +355,7 @@ impl QueryExecutor for SnowflakeQueryExecutor { // If parsing the count resulted in an error, return an internal error let count = match count { Ok(c) => c, - Err(err) => { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - }))) - } + Err(err) => return Err(PgWireError::ApiError(err.into())), }; tracing::info!("fetching {} rows", count); @@ -413,11 +403,7 @@ impl QueryExecutor for SnowflakeQueryExecutor { sf_ast .rewrite(&mut new_query) .context("unable to rewrite query") - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: err.to_string(), - })) - })?; + .map_err(|err| PgWireError::ApiError(err.into()))?; // new_query.limit = Some(Expr::Value(Value::Number("1".to_owned(), false))); diff --git a/nexus/peer-snowflake/src/stream.rs b/nexus/peer-snowflake/src/stream.rs index 3434b70dfa..4740270d12 100644 --- a/nexus/peer-snowflake/src/stream.rs +++ b/nexus/peer-snowflake/src/stream.rs @@ -3,7 +3,6 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; use futures::Stream; use peer_cursor::Schema; use peer_cursor::{Record, RecordStream, SchemaRef}; -use pgerror::PgError; use pgwire::{ api::{ results::{FieldFormat, FieldInfo}, @@ -236,25 +235,15 @@ impl SnowflakeRecordStream { impl Stream for SnowflakeRecordStream { type Item = PgWireResult; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - - match this.advance() { + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match self.advance() { Ok(true) => { - let record = this.convert_result_set_item(); - let result = record.map_err(|e| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error getting current row: {}", e), - })) - }); + let record = self.convert_result_set_item(); + let result = record.map_err(|e| PgWireError::ApiError(e.into())); Poll::Ready(Some(result)) } Ok(false) => Poll::Ready(None), - Err(err) => Poll::Ready(Some(Err(PgWireError::ApiError(Box::new( - PgError::Internal { - err_msg: format!("Checking for next row in result set failed: {}", err), - }, - ))))), + Err(err) => Poll::Ready(Some(Err(PgWireError::ApiError(err.into())))), } } } diff --git a/nexus/pgerror/Cargo.toml b/nexus/pgerror/Cargo.toml deleted file mode 100644 index 2dca877fea..0000000000 --- a/nexus/pgerror/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "pgerror" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -snafu = { version = "0.7", features = ["backtraces"] } diff --git a/nexus/pgerror/src/lib.rs b/nexus/pgerror/src/lib.rs deleted file mode 100644 index bf842f7780..0000000000 --- a/nexus/pgerror/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -use snafu::Snafu; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum PgError { - #[snafu(display("Internal error: {}", err_msg))] - Internal { err_msg: String }, -} diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 3eb6558a2d..dfa165b7dc 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -1,7 +1,7 @@ -use std::fmt::Write; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use postgres_openssl::MakeTlsConnector; use pt::peerdb_peers::PostgresConfig; +use std::fmt::Write; pub fn get_pg_connection_string(config: &PostgresConfig) -> String { let mut connection_string = String::from("postgres://"); @@ -13,7 +13,12 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String { } // Add the timeout as a query parameter, sslmode changes here appear to be useless - write!(connection_string, "@{}:{}/{}?connect_timeout=15", config.host, config.port, config.database).ok(); + write!( + connection_string, + "@{}:{}/{}?connect_timeout=15", + config.host, config.port, config.database + ) + .ok(); connection_string } diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index 1bf4542c22..33d6d14920 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -60,7 +60,6 @@ tracing-appender = "0.2" tracing-subscriber = "0.3" uuid = "1.0" cargo-deb = "2.0" -pgerror = { path = "../pgerror" } [dev-dependencies] postgres = "0.19.4" diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index a3f1048bdc..9aa4dcb9a4 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -20,7 +20,6 @@ use peer_cursor::{ QueryExecutor, QueryOutput, SchemaRef, }; use peerdb_parser::{NexusParsedStatement, NexusQueryParser, NexusStatement}; -use pgerror::PgError; use pgwire::{ api::{ auth::{ @@ -169,9 +168,9 @@ impl NexusBackend { .get_workflow_details_for_flow_job(flow_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to query catalog for job metadata: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to query catalog for job metadata: {:?}", err).into(), + ) })?; Ok(workflow_details) } @@ -181,9 +180,7 @@ impl NexusBackend { peer_name: &str, ) -> PgWireResult { let peer = catalog.get_peer(peer_name).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err), - })) + PgWireError::ApiError(format!("unable to get peer {:?}: {:?}", peer_name, err).into()) })?; Ok(peer) } @@ -224,9 +221,7 @@ impl NexusBackend { .validate_peer(&validate_request) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to check peer validity: {:?}", err), - })) + PgWireError::ApiError(format!("unable to check peer validity: {:?}", err).into()) })?; if let PeerValidationResult::Invalid(validation_err) = validity { Err(PgWireError::UserError(Box::new(ErrorInfo::new( @@ -251,9 +246,9 @@ impl NexusBackend { flow_job_name, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let catalog = self.catalog.lock().await; @@ -266,12 +261,10 @@ impl NexusBackend { .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to query catalog for job metadata: {:?}", err) + .into(), + ) })?; tracing::info!( "got workflow id: {:?}", @@ -283,17 +276,17 @@ impl NexusBackend { .shutdown_flow_job(flow_job_name, workflow_details) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to shutdown flow job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to shutdown flow job: {:?}", err).into(), + ) })?; catalog .delete_flow_job_entry(flow_job_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to delete job metadata: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to delete job metadata: {:?}", err).into(), + ) })?; let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name); Ok(vec![Response::Execution(Tag::new_for_execution( @@ -331,9 +324,9 @@ impl NexusBackend { qrep_flow_job, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let mirror_details; { @@ -348,12 +341,10 @@ impl NexusBackend { .create_qrep_flow_job_entry(qrep_flow_job) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to create mirror job entry: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to create mirror job entry: {:?}", err) + .into(), + ) })?; } @@ -421,9 +412,9 @@ impl NexusBackend { flow_job, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let catalog = self.catalog.lock().await; let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; @@ -435,24 +426,22 @@ impl NexusBackend { let mut destinations = HashSet::with_capacity(table_mappings_count); for tm in flow_job.table_mappings.iter() { if !sources.insert(tm.source_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new( - PgError::Internal { - err_msg: format!( - "Duplicate source table identifier {}", - tm.source_table_identifier - ), - }, - ))); + return Err(PgWireError::ApiError( + format!( + "Duplicate source table identifier {}", + tm.source_table_identifier + ) + .into(), + )); } if !destinations.insert(tm.destination_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new( - PgError::Internal { - err_msg: format!( - "Duplicate destination table identifier {}", - tm.destination_table_identifier - ), - }, - ))); + return Err(PgWireError::ApiError( + format!( + "Duplicate destination table identifier {}", + tm.destination_table_identifier + ) + .into(), + )); } } } @@ -461,12 +450,9 @@ impl NexusBackend { .create_cdc_flow_job_entry(flow_job) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to create mirror job entry: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to create mirror job entry: {:?}", err).into(), + ) })?; // get source and destination peers @@ -481,18 +467,18 @@ impl NexusBackend { .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to submit job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to submit job: {:?}", err).into(), + ) })?; catalog .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to save job metadata: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to save job metadata: {:?}", err).into(), + ) })?; let create_mirror_success = format!("CREATE MIRROR {}", flow_job.name); @@ -509,9 +495,9 @@ impl NexusBackend { } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } if let Some(job) = { @@ -520,9 +506,9 @@ impl NexusBackend { .get_qrep_flow_job_by_name(flow_job_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get qrep flow job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to get qrep flow job: {:?}", err).into(), + ) })? } { let workflow_id = self.run_qrep_mirror(&job).await?; @@ -545,9 +531,9 @@ impl NexusBackend { peer_name, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let catalog = self.catalog.lock().await; @@ -557,20 +543,15 @@ impl NexusBackend { if_exists ); let peer_exists = catalog.check_peer_entry(peer_name).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for peer metadata: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to query catalog for peer metadata: {:?}", err).into(), + ) })?; tracing::info!("peer exist count: {}", peer_exists); if peer_exists != 0 { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler.drop_peer(peer_name).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to drop peer: {:?}", err), - })) + PgWireError::ApiError(format!("unable to drop peer: {:?}", err).into()) })?; let drop_peer_success = format!("DROP PEER {}", peer_name); Ok(vec![Response::Execution(Tag::new_for_execution( @@ -598,9 +579,9 @@ impl NexusBackend { .. } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let qrep_config = { @@ -610,12 +591,9 @@ impl NexusBackend { .get_qrep_config_proto(mirror_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "error while getting QRep flow job: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("error while getting QRep flow job: {:?}", err).into(), + ) })? }; @@ -644,12 +622,10 @@ impl NexusBackend { .start_query_replication_flow(&qrep_config) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "error while starting new QRep job: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("error while starting new QRep job: {:?}", err) + .into(), + ) })?; // relock catalog, DROP MIRROR is done with it now let catalog = self.catalog.lock().await; @@ -660,12 +636,13 @@ impl NexusBackend { ) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( + PgWireError::ApiError( + format!( "unable to update workflow for flow job: {:?}", err - ), - })) + ) + .into(), + ) })?; let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name); @@ -688,9 +665,9 @@ impl NexusBackend { flow_job_name, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let catalog = self.catalog.lock().await; @@ -703,12 +680,10 @@ impl NexusBackend { .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to query catalog for job metadata: {:?}", err) + .into(), + ) })?; tracing::info!( "[PAUSE MIRROR] got workflow id: {:?}", @@ -721,9 +696,9 @@ impl NexusBackend { .flow_state_change(flow_job_name, &workflow_details.workflow_id, true) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to shutdown flow job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to shutdown flow job: {:?}", err).into(), + ) })?; let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name); Ok(vec![Response::Execution(Tag::new_for_execution( @@ -749,9 +724,9 @@ impl NexusBackend { flow_job_name, } => { if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); + return Err(PgWireError::ApiError( + "flow service is not configured".into(), + )); } let catalog = self.catalog.lock().await; @@ -764,12 +739,10 @@ impl NexusBackend { .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to query catalog for job metadata: {:?}", err) + .into(), + ) })?; tracing::info!( "[RESUME MIRROR] got workflow id: {:?}", @@ -782,9 +755,9 @@ impl NexusBackend { .flow_state_change(flow_job_name, &workflow_details.workflow_id, false) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to shutdown flow job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to shutdown flow job: {:?}", err).into(), + ) })?; let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); Ok(vec![Response::Execution(Tag::new_for_execution( @@ -813,9 +786,9 @@ impl NexusBackend { tracing::info!("handling peer[{}] query: {}", peer.name, stmt); peer_holder = Some(peer.clone()); self.get_peer_executor(&peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) })? } QueryAssociation::Catalog => { @@ -849,9 +822,9 @@ impl NexusBackend { Arc::clone(catalog.get_executor()) } Some(peer) => self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get peer executor: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) })?, } }; @@ -872,18 +845,14 @@ impl NexusBackend { .get_peer(&qrep_flow_job.source_peer) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get source peer: {:?}", err), - })) + PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into()) })?; let dst_peer = catalog .get_peer(&qrep_flow_job.target_peer) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to get destination peer: {:?}", err), - })) + PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into()) })?; // make a request to the flow service to start the job. @@ -892,18 +861,16 @@ impl NexusBackend { .start_qrep_flow_job(qrep_flow_job, src_peer, dst_peer) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to submit job: {:?}", err), - })) + PgWireError::ApiError(format!("unable to submit job: {:?}", err).into()) })?; catalog .update_workflow_id_for_flow_job(&qrep_flow_job.name, &workflow_id) .await .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to update workflow for flow job: {:?}", err), - })) + PgWireError::ApiError( + format!("unable to update workflow for flow job: {:?}", err).into(), + ) })?; Ok(workflow_id) @@ -1073,36 +1040,30 @@ impl ExtendedQueryHandler for NexusBackend { Some(Config::BigqueryConfig(_)) => { let executor = self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to get peer executor: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err) + .into(), + ) })?; executor.describe(stmt).await? } Some(Config::PostgresConfig(_)) => { let executor = self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to get peer executor: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err) + .into(), + ) })?; executor.describe(stmt).await? } Some(Config::SnowflakeConfig(_)) => { let executor = self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to get peer executor: {:?}", - err - ), - })) + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err) + .into(), + ) })?; executor.describe(stmt).await? }