From 693e8de83af8160b46dd20109ed23e552cba257a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 17 Sep 2023 04:05:28 -0500 Subject: [PATCH] feat: scope spawned task with trace id (#2419) * feat: scope spawned task with trace id Signed-off-by: Ruihang Xia * fix test Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/common/meta/src/ddl/alter_table.rs | 7 +++++-- src/datanode/src/region_server.rs | 19 ++++++++++--------- src/servers/src/grpc/flight.rs | 13 +++++++++---- src/servers/src/grpc/flight/stream.rs | 13 ++++++------- src/servers/src/grpc/greptime_handler.rs | 7 ++++--- src/servers/src/grpc/region_server.rs | 14 ++++++++++---- 6 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 7cefa0f5668a..8641d5d7b2a6 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -17,7 +17,7 @@ use std::vec; use api::v1::alter_expr::Kind; use api::v1::region::{ alter_request, region_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, - RegionColumnDef, RegionRequest, + RegionColumnDef, RegionRequest, RegionRequestHeader, }; use api::v1::{AlterExpr, RenameTable}; use async_trait::async_trait; @@ -201,7 +201,10 @@ impl AlterTableProcedure { let region_id = RegionId::new(table_id, region); let request = self.create_alter_region_request(region_id)?; let request = RegionRequest { - header: None, + header: Some(RegionRequestHeader { + trace_id: common_telemetry::trace_id().unwrap_or_default(), + ..Default::default() + }), body: Some(region_request::Body::Alter(request)), }; debug!("Submitting {request:?} to {datanode}"); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 040c4a595768..601ae1f28874 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -135,23 +135,19 @@ impl RegionServerHandler for RegionServer { .context(ExecuteGrpcRequestSnafu)?; let join_tasks = requests.into_iter().map(|(region_id, req)| { let self_to_move = self.clone(); - self.inner - .runtime - .spawn(async move { self_to_move.handle_request(region_id, req).await }) + async move { self_to_move.handle_request(region_id, req).await } }); let results = try_join_all(join_tasks) .await - .context(servers_error::JoinTaskSnafu)?; + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)?; // merge results by simply sum up affected rows. // only insert/delete will have multiple results. let mut affected_rows = 0; for result in results { - match result - .map_err(BoxedError::new) - .context(ExecuteGrpcRequestSnafu)? - { + match result { Output::AffectedRows(rows) => affected_rows += rows, Output::Stream(_) | Output::RecordBatches(_) => { // TODO: change the output type to only contains `affected_rows` @@ -181,10 +177,15 @@ impl FlightCraft for RegionServer { let ticket = request.into_inner().ticket; let request = QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; + let trace_id = request + .header + .as_ref() + .map(|h| h.trace_id) + .unwrap_or_default(); let result = self.handle_read(request).await?; - let stream = Box::pin(FlightRecordBatchStream::new(result)); + let stream = Box::pin(FlightRecordBatchStream::new(result, trace_id)); Ok(Response::new(stream)) } } diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 6f4f7cbca721..cea21a8a00c6 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -150,23 +150,28 @@ impl FlightCraft for GreptimeRequestHandler { let ticket = request.into_inner().ticket; let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; + let trace_id = request + .header + .as_ref() + .map(|h| h.trace_id) + .unwrap_or_default(); let output = self.handle_request(request).await?; let stream: Pin> + Send + Sync>> = - to_flight_data_stream(output); + to_flight_data_stream(output, trace_id); Ok(Response::new(stream)) } } -fn to_flight_data_stream(output: Output) -> TonicStream { +fn to_flight_data_stream(output: Output, trace_id: u64) -> TonicStream { match output { Output::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream); + let stream = FlightRecordBatchStream::new(stream, trace_id); Box::pin(stream) as _ } Output::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream()); + let stream = FlightRecordBatchStream::new(x.as_stream(), trace_id); Box::pin(stream) as _ } Output::AffectedRows(rows) => { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 5ff570608e33..542b031df887 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -18,7 +18,7 @@ use std::task::{Context, Poll}; use arrow_flight::FlightData; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::warn; +use common_telemetry::{warn, TRACE_ID}; use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; @@ -39,12 +39,11 @@ pub struct FlightRecordBatchStream { } impl FlightRecordBatchStream { - pub fn new(recordbatches: SendableRecordBatchStream) -> Self { + pub fn new(recordbatches: SendableRecordBatchStream, trace_id: u64) -> Self { let (tx, rx) = mpsc::channel::>(1); - let join_handle = - common_runtime::spawn_read( - async move { Self::flight_data_stream(recordbatches, tx).await }, - ); + let join_handle = common_runtime::spawn_read(TRACE_ID.scope(trace_id, async move { + Self::flight_data_stream(recordbatches, tx).await + })); Self { rx, join_handle, @@ -146,7 +145,7 @@ mod test { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]) .unwrap() .as_stream(); - let mut stream = FlightRecordBatchStream::new(recordbatches); + let mut stream = FlightRecordBatchStream::new(recordbatches, 0); let mut raw_data = Vec::with_capacity(2); raw_data.push(stream.next().await.unwrap().unwrap()); diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index a1af9e8d6547..f8dbe3010163 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -27,7 +27,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; -use common_telemetry::logging; +use common_telemetry::{logging, TRACE_ID}; use metrics::{histogram, increment_counter}; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -74,6 +74,7 @@ impl GreptimeRequestHandler { let request_type = request_type(&query).to_string(); let db = query_ctx.get_db_string(); let timer = RequestTimer::new(db.clone(), request_type); + let trace_id = query_ctx.trace_id(); // Executes requests in another runtime to // 1. prevent the execution from being cancelled unexpected by Tonic runtime; @@ -82,7 +83,7 @@ impl GreptimeRequestHandler { // - Obtaining a `JoinHandle` to get the panic message (if there's any). // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. // 2. avoid the handler blocks the gRPC runtime incidentally. - let handle = self.runtime.spawn(async move { + let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move { handler.do_query(query, query_ctx).await.map_err(|e| { if e.status_code().should_log_error() { logging::error!(e; "Failed to handle request"); @@ -92,7 +93,7 @@ impl GreptimeRequestHandler { } e }) - }); + })); handle.await.context(JoinTaskSnafu).map_err(|e| { timer.record(e.status_code()); diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index a93686b026aa..19d075c356f0 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -19,7 +19,7 @@ use api::v1::region::{region_request, RegionRequest, RegionResponse}; use async_trait::async_trait; use common_error::ext::ErrorExt; use common_runtime::Runtime; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, TRACE_ID}; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; @@ -45,8 +45,14 @@ impl RegionServerRequestHandler { } async fn handle(&self, request: RegionRequest) -> Result { + let trace_id = request + .header + .context(InvalidQuerySnafu { + reason: "Expecting non-empty region request header.", + })? + .trace_id; let query = request.body.context(InvalidQuerySnafu { - reason: "Expecting non-empty GreptimeRequest.", + reason: "Expecting non-empty region request body.", })?; let handler = self.handler.clone(); @@ -58,7 +64,7 @@ impl RegionServerRequestHandler { // - Obtaining a `JoinHandle` to get the panic message (if there's any). // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. // 2. avoid the handler blocks the gRPC runtime incidentally. - let handle = self.runtime.spawn(async move { + let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move { handler.handle(query).await.map_err(|e| { if e.status_code().should_log_error() { error!(e; "Failed to handle request"); @@ -68,7 +74,7 @@ impl RegionServerRequestHandler { } e }) - }); + })); handle.await.context(JoinTaskSnafu)? }