Skip to content

Commit

Permalink
chore: patch for cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Jul 20, 2023
1 parent 2ef0d06 commit 85eb889
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 21 deletions.
3 changes: 2 additions & 1 deletion src/catalog/src/remote/region_alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ impl HeartbeatResponseHandler for RegionAliveKeepers {

let Some(keeper) = self.keepers.lock().await.get(&table_ident).cloned() else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for table {table_ident} is not found!");
// remove warn since we disable alive keeper on cloud
// warn!("Alive keeper for table {table_ident} is not found!");
continue;
};

Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -118,7 +118,7 @@ impl HeartbeatTask {
ctx: HeartbeatResponseHandlerContext,
handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Result<()> {
trace!("heartbeat response: {:?}", ctx.response);
debug!("heartbeat response: {:?}", ctx.response);
handler_executor
.handle(ctx)
.await
Expand All @@ -141,7 +141,7 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

self.region_alive_keepers.start().await;
// self.region_alive_keepers.start().await;

let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();
Expand Down
8 changes: 6 additions & 2 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use tonic::Code;

use crate::auth;
use crate::{auth, source_error_str};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
Expand Down Expand Up @@ -472,7 +472,11 @@ impl From<Error> for tonic::Status {
}

let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(status_to_tonic_code(status_code), err.to_string(), metadata)
tonic::Status::with_metadata(
status_to_tonic_code(status_code),
source_error_str(err),
metadata,
)
}
}

Expand Down
15 changes: 5 additions & 10 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use crate::query_handler::{
PromStoreProtocolHandlerRef, ScriptHandlerRef,
};
use crate::server::Server;
use crate::source_error_str;

/// create query context from database name information, catalog and schema are
/// resolved from the name
Expand All @@ -96,7 +97,7 @@ pub(crate) async fn query_context_from_db(
StatusCode::DatabaseNotFound,
)),
Err(e) => Err(JsonResponse::with_error(
format!("Error checking database: {db}, {e}"),
source_error_str(&e),
StatusCode::Internal,
)),
}
Expand Down Expand Up @@ -321,10 +322,7 @@ impl JsonResponse {
},

Err(e) => {
return Self::with_error(
format!("Recordbatch error: {e}"),
e.status_code(),
);
return Self::with_error(source_error_str(&e), e.status_code());
}
}
}
Expand All @@ -337,10 +335,7 @@ impl JsonResponse {
}
},
Err(e) => {
return Self::with_error(
format!("Query engine output error: {e}"),
e.status_code(),
);
return Self::with_error(source_error_str(&e), e.status_code());
}
}
}
Expand Down Expand Up @@ -767,7 +762,7 @@ async fn handle_error(err: BoxError) -> Json<JsonResponse> {
logging::error!("Unhandled internal error: {}", err);

Json(JsonResponse::with_error(
format!("Unhandled internal error: {err}"),
source_error_str(&*err),
StatusCode::Unexpected,
))
}
Expand Down
8 changes: 8 additions & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ pub fn parse_catalog_and_schema_from_client_database_name(db: &str) -> (&str, &s
}
}

pub fn source_error_str(error: impl std::error::Error) -> String {
if let Some(source_error) = error.source() {
source_error_str(source_error)
} else {
error.to_string()
}
}

/// Cached SQL and logical plan for database interfaces
#[derive(Clone)]
pub struct SqlPlan {
Expand Down
3 changes: 2 additions & 1 deletion src/servers/src/mysql/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::io::AsyncWrite;

use crate::error::{self, Error, Result};
use crate::metrics::*;
use crate::source_error_str;

/// Try to write multiple output to the writer if possible.
pub async fn write_output<'a, W: AsyncWrite + Send + Sync + Unpin>(
Expand Down Expand Up @@ -211,7 +212,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
);

let kind = ErrorKind::ER_INTERNAL_ERROR;
w.error(kind, error.to_string().as_bytes()).await?;
w.error(kind, source_error_str(error).as_bytes()).await?;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use super::types::*;
use super::PostgresServerHandler;
use crate::error::Result;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::SqlPlan;
use crate::{source_error_str, SqlPlan};

#[async_trait]
impl SimpleQueryHandler for PostgresServerHandler {
Expand Down Expand Up @@ -95,7 +95,7 @@ fn output_to_query_response<'a>(
Err(e) => Ok(Response::Error(Box::new(ErrorInfo::new(
"ERROR".to_string(),
"XX000".to_string(),
e.to_string(),
source_error_str(e),
)))),
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,16 @@ impl Stream for StreamWithMetricWrapper {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _timer = this.metric.elapsed_compute().timer();
// todo(shuiyisong): remove scan timer for now
// let _timer = this.metric.elapsed_compute().timer();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll {
this.metric.record_output(record_batch.num_rows());
let batch_mem_size = record_batch
.columns()
.iter()
.map(|vec_ref| vec_ref.memory_size())
.sum::<usize>();
this.metric.record_output(batch_mem_size);
}

poll
Expand Down

0 comments on commit 85eb889

Please sign in to comment.