Skip to content

Commit

Permalink
refactor(error): refine error reporting in pgwire (#13547)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 23, 2023
1 parent 51540c0 commit 8b756ae
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 49 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ cluster_stop
echo "--- e2e, $mode, error ui"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/error_ui/**/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/error_ui/simple/**/*.slt'
sqllogictest -p 4566 -d dev -e postgres-extended './e2e_test/error_ui/extended/**/*.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/error_ui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ When you find the tests in this directory failing...
- First, ensure that the changes to the error messages are expected and make them look better.
- Then, update the test cases by running:
```bash
./risedev slt './e2e_test/error_ui/**/*.slt' --override
./risedev slt './e2e_test/error_ui/simple/**/*.slt' --override
./risedev slt './e2e_test/error_ui/extended/**/*.slt' --engine postgres-extended --override
```
Please note that the minimum required version of `sqllogictest` is 0.18 or higher.
18 changes: 18 additions & 0 deletions e2e_test/error_ui/extended/main.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
query error
selet 1;
----
db error: ERROR: Failed to prepare the statement

Caused by:
sql parser error: Expected an SQL statement, found: selet at line:1, column:6
Near "selet"


query error
select 1/0;
----
db error: ERROR: Failed to execute the statement

Caused by these errors (recent errors listed first):
1: Expr error
2: Division by zero
20 changes: 15 additions & 5 deletions e2e_test/error_ui/main.slt → e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
query error
selet 1;
----
db error: ERROR: Failed to run the query

Caused by:
sql parser error: Expected an SQL statement, found: selet at line:1, column:6
Near "selet"


statement error
create function int_42() returns int as int_42 using link 'localhost:8815';
----
db error: ERROR: QueryError
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to connect to UDF service
Expand All @@ -13,7 +23,7 @@ Caused by these errors (recent errors listed first):
statement error
alter system set not_exist_key to value;
----
db error: ERROR: QueryError
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
Expand All @@ -23,7 +33,7 @@ Caused by these errors (recent errors listed first):
query error
select v1 + v2 = v3;
----
db error: ERROR: QueryError
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Failed to bind expression: v1 + v2 = v3
Expand All @@ -33,7 +43,7 @@ Caused by these errors (recent errors listed first):
query error
select 1/0;
----
db error: ERROR: QueryError
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Expr error
Expand All @@ -43,7 +53,7 @@ Caused by these errors (recent errors listed first):
query error
select x/0 from generate_series(1, 3) as g(x);
----
db error: ERROR: QueryError
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Expr error
Expand Down
8 changes: 7 additions & 1 deletion src/error/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ impl std::fmt::Display for TonicStatusWrapper {
{
write!(f, " to {} service", service_name)?;
}
write!(f, " failed: {}: {}", self.0.code(), self.0.message())
write!(f, " failed: {}: ", self.0.code())?;
if let Some(source) = self.source() {
// Prefer the source chain from the `details` field.
write!(f, "{}", source)
} else {
write!(f, "{}", self.0.message())
}
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/utils/pgwire/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,43 @@ pub type PsqlResult<T> = std::result::Result<T, PsqlError>;
/// Error type used in pgwire crates.
#[derive(Error, Debug)]
pub enum PsqlError {
#[error("Startup Error when connect to session: {0}")]
#[error("Failed to start a new session: {0}")]
StartupError(#[source] BoxedError),

#[error("Invalid password")]
PasswordError,

#[error("QueryError: {0}")]
QueryError(#[source] BoxedError),
#[error("Failed to run the query: {0}")]
SimpleQueryError(#[source] BoxedError),

#[error("ParseError: {0}")]
ParseError(#[source] BoxedError),
#[error("Failed to prepare the statement: {0}")]
ExtendedPrepareError(#[source] BoxedError),

#[error("ExecuteError: {0}")]
ExecuteError(#[source] BoxedError),
#[error("Failed to execute the statement: {0}")]
ExtendedExecuteError(#[source] BoxedError),

#[error(transparent)]
IoError(#[from] IoError),

/// Include error for describe, bind.
/// Uncategorized error for describe, bind.
#[error(transparent)]
Internal(BoxedError),
Uncategorized(BoxedError),

#[error("Panicked when processing: {0}
#[error("Panicked when handling the request: {0}
This is a bug. We would appreciate a bug report at:
https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml")]
Panic(String),

#[error("Unable to set up an ssl connection")]
#[error("Unable to setup an SSL connection")]
SslError(#[from] openssl::ssl::Error),
}

impl PsqlError {
pub fn no_statement() -> Self {
PsqlError::Internal("No statement found".into())
PsqlError::Uncategorized("No statement found".into())
}

pub fn no_portal() -> Self {
PsqlError::Internal("No portal found".into())
PsqlError::Uncategorized("No portal found".into())
}
}
2 changes: 1 addition & 1 deletion src/utils/pgwire/src/pg_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
.values_stream()
.try_next()
.await
.map_err(PsqlError::ExecuteError)?
.map_err(PsqlError::ExtendedExecuteError)?
{
rows.into_iter()
} else {
Expand Down
54 changes: 29 additions & 25 deletions src/utils/pgwire/src/pg_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ where
return None;
}

PsqlError::QueryError(_) => {
PsqlError::SimpleQueryError(_) => {
self.stream
.write_no_flush(&BeMessage::ErrorResponse(Box::new(e)))
.ok()?;
Expand All @@ -277,9 +277,9 @@ where
return None;
}

PsqlError::Internal(_)
| PsqlError::ParseError(_)
| PsqlError::ExecuteError(_) => {
PsqlError::Uncategorized(_)
| PsqlError::ExtendedPrepareError(_)
| PsqlError::ExtendedExecuteError(_) => {
self.stream
.write_no_flush(&BeMessage::ErrorResponse(Box::new(e)))
.ok()?;
Expand Down Expand Up @@ -463,7 +463,7 @@ where

async fn process_query_msg(&mut self, query_string: io::Result<&str>) -> PsqlResult<()> {
let sql: Arc<str> =
Arc::from(query_string.map_err(|err| PsqlError::QueryError(Box::new(err)))?);
Arc::from(query_string.map_err(|err| PsqlError::SimpleQueryError(Box::new(err)))?);
let start = Instant::now();
let session = self.session.clone().unwrap();
let session_id = session.id().0;
Expand Down Expand Up @@ -494,7 +494,7 @@ where
// Parse sql.
let stmts = Parser::parse_sql(&sql)
.inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e))
.map_err(|err| PsqlError::QueryError(err.into()))?;
.map_err(|err| PsqlError::SimpleQueryError(err.into()))?;
if stmts.is_empty() {
self.stream.write_no_flush(&BeMessage::EmptyQueryResponse)?;
}
Expand Down Expand Up @@ -535,7 +535,7 @@ where
self.stream
.write_no_flush(&BeMessage::NoticeResponse(&notice))?;
}
let mut res = res.map_err(PsqlError::QueryError)?;
let mut res = res.map_err(PsqlError::SimpleQueryError)?;

for notice in res.notices() {
self.stream
Expand All @@ -556,7 +556,7 @@ where
let mut rows_cnt = 0;

while let Some(row_set) = res.values_stream().next().await {
let row_set = row_set.map_err(PsqlError::QueryError)?;
let row_set = row_set.map_err(PsqlError::SimpleQueryError)?;
for row in row_set {
self.stream.write_no_flush(&BeMessage::DataRow(&row))?;
rows_cnt += 1;
Expand Down Expand Up @@ -623,16 +623,18 @@ where
// prepare statement.
self.unnamed_prepare_statement.take();
} else if self.prepare_statement_store.contains_key(&statement_name) {
return Err(PsqlError::ParseError("Duplicated statement name".into()));
return Err(PsqlError::ExtendedPrepareError(
"Duplicated statement name".into(),
));
}

let stmt = {
let stmts = Parser::parse_sql(sql)
.inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e))
.map_err(|err| PsqlError::ParseError(err.into()))?;
.map_err(|err| PsqlError::ExtendedPrepareError(err.into()))?;

if stmts.len() > 1 {
return Err(PsqlError::ParseError(
return Err(PsqlError::ExtendedPrepareError(
"Only one statement is allowed in extended query mode".into(),
));
}
Expand All @@ -652,11 +654,11 @@ where
}
})
.try_collect()
.map_err(|err: RwError| PsqlError::ParseError(err.into()))?;
.map_err(|err: RwError| PsqlError::ExtendedPrepareError(err.into()))?;

let prepare_statement = session
.parse(stmt, param_types)
.map_err(PsqlError::ParseError)?;
.map_err(PsqlError::ExtendedPrepareError)?;

if statement_name.is_empty() {
self.unnamed_prepare_statement.replace(prepare_statement);
Expand All @@ -680,7 +682,7 @@ where
let session = self.session.clone().unwrap();

if self.portal_store.contains_key(&portal_name) {
return Err(PsqlError::Internal("Duplicated portal name".into()));
return Err(PsqlError::Uncategorized("Duplicated portal name".into()));
}

let prepare_statement = self.get_statement(&statement_name)?;
Expand All @@ -698,7 +700,7 @@ where

let portal = session
.bind(prepare_statement, msg.params, param_formats, result_formats)
.map_err(PsqlError::Internal)?;
.map_err(PsqlError::Uncategorized)?;

if portal_name.is_empty() {
self.result_cache.remove(&portal_name);
Expand Down Expand Up @@ -753,7 +755,7 @@ where
sql = format_args!("{}", truncated_fmt::TruncatedFmt(&sql, *RW_QUERY_LOG_TRUNCATE_LEN)),
);

let pg_response = result.map_err(PsqlError::ExecuteError)?;
let pg_response = result.map_err(PsqlError::ExtendedExecuteError)?;
let mut result_cache = ResultCache::new(pg_response);
let is_consume_completed = result_cache.consume::<S>(row_max, &mut self.stream).await?;
if !is_consume_completed {
Expand All @@ -779,7 +781,7 @@ where
.clone()
.unwrap()
.describe_statement(prepare_statement)
.map_err(PsqlError::Internal)?;
.map_err(PsqlError::Uncategorized)?;

self.stream
.write_no_flush(&BeMessage::ParameterDescription(
Expand All @@ -799,7 +801,7 @@ where

let row_descriptions = session
.describe_portal(portal)
.map_err(PsqlError::Internal)?;
.map_err(PsqlError::Uncategorized)?;

if row_descriptions.is_empty() {
// According https://www.postgresql.org/docs/current/protocol-flow.html#:~:text=The%20response%20is%20a%20RowDescri[…]0a%20query%20that%20will%20return%20rows%3B,
Expand Down Expand Up @@ -850,14 +852,14 @@ where
Ok(self
.unnamed_portal
.as_ref()
.ok_or_else(|| PsqlError::Internal("unnamed portal not found".into()))?
.ok_or_else(|| PsqlError::Uncategorized("unnamed portal not found".into()))?
.clone())
} else {
Ok(self
.portal_store
.get(portal_name)
.ok_or_else(|| {
PsqlError::Internal(format!("Portal {} not found", portal_name).into())
PsqlError::Uncategorized(format!("Portal {} not found", portal_name).into())
})?
.clone())
}
Expand All @@ -871,14 +873,16 @@ where
Ok(self
.unnamed_prepare_statement
.as_ref()
.ok_or_else(|| PsqlError::Internal("unnamed prepare statement not found".into()))?
.ok_or_else(|| {
PsqlError::Uncategorized("unnamed prepare statement not found".into())
})?
.clone())
} else {
Ok(self
.prepare_statement_store
.get(statement_name)
.ok_or_else(|| {
PsqlError::Internal(
PsqlError::Uncategorized(
format!("Prepare statement {} not found", statement_name).into(),
)
})?
Expand Down Expand Up @@ -1069,13 +1073,13 @@ fn build_ssl_ctx_from_config(tls_config: &TlsConfig) -> PsqlResult<SslContext> {
// Now we set every verify to true.
acceptor
.set_private_key_file(key_path, openssl::ssl::SslFiletype::PEM)
.map_err(|e| PsqlError::Internal(e.into()))?;
.map_err(|e| PsqlError::Uncategorized(e.into()))?;
acceptor
.set_ca_file(cert_path)
.map_err(|e| PsqlError::Internal(e.into()))?;
.map_err(|e| PsqlError::Uncategorized(e.into()))?;
acceptor
.set_certificate_chain_file(cert_path)
.map_err(|e| PsqlError::Internal(e.into()))?;
.map_err(|e| PsqlError::Uncategorized(e.into()))?;
let acceptor = acceptor.build();

Ok(acceptor.into_context())
Expand Down
2 changes: 1 addition & 1 deletion src/utils/pgwire/src/pg_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ where
}

if let Some(callback) = self.callback.take() {
callback.await.map_err(PsqlError::ExecuteError)?;
callback.await.map_err(PsqlError::SimpleQueryError)?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils/pgwire/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Format {
match format_code {
0 => Ok(Format::Text),
1 => Ok(Format::Binary),
_ => Err(PsqlError::Internal(
_ => Err(PsqlError::Uncategorized(
format!("Unknown format code: {}", format_code).into(),
)),
}
Expand Down

0 comments on commit 8b756ae

Please sign in to comment.