Skip to content

Commit

Permalink
feat: scope spawned task with trace id (#2419)
Browse files Browse the repository at this point in the history
* feat: scope spawned task with trace id

Signed-off-by: Ruihang Xia <[email protected]>

* fix test

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Sep 17, 2023
1 parent 542e863 commit 693e8de
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 29 deletions.
7 changes: 5 additions & 2 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::vec;
use api::v1::alter_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns,
RegionColumnDef, RegionRequest,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{AlterExpr, RenameTable};
use async_trait::async_trait;
Expand Down Expand Up @@ -201,7 +201,10 @@ impl AlterTableProcedure {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: None,
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");
Expand Down
19 changes: 10 additions & 9 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,19 @@ impl RegionServerHandler for RegionServer {
.context(ExecuteGrpcRequestSnafu)?;
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
self.inner
.runtime
.spawn(async move { self_to_move.handle_request(region_id, req).await })
async move { self_to_move.handle_request(region_id, req).await }
});

let results = try_join_all(join_tasks)
.await
.context(servers_error::JoinTaskSnafu)?;
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;

// merge results by simply sum up affected rows.
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?
{
match result {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
Expand Down Expand Up @@ -181,10 +177,15 @@ impl FlightCraft for RegionServer {
let ticket = request.into_inner().ticket;
let request = QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;
let trace_id = request
.header
.as_ref()
.map(|h| h.trace_id)
.unwrap_or_default();

let result = self.handle_read(request).await?;

let stream = Box::pin(FlightRecordBatchStream::new(result));
let stream = Box::pin(FlightRecordBatchStream::new(result, trace_id));
Ok(Response::new(stream))
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/servers/src/grpc/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,28 @@ impl FlightCraft for GreptimeRequestHandler {
let ticket = request.into_inner().ticket;
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let trace_id = request
.header
.as_ref()
.map(|h| h.trace_id)
.unwrap_or_default();

let output = self.handle_request(request).await?;

let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync>> =
to_flight_data_stream(output);
to_flight_data_stream(output, trace_id);
Ok(Response::new(stream))
}
}

fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
fn to_flight_data_stream(output: Output, trace_id: u64) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream);
let stream = FlightRecordBatchStream::new(stream, trace_id);
Box::pin(stream) as _
}
Output::RecordBatches(x) => {
let stream = FlightRecordBatchStream::new(x.as_stream());
let stream = FlightRecordBatchStream::new(x.as_stream(), trace_id);
Box::pin(stream) as _
}
Output::AffectedRows(rows) => {
Expand Down
13 changes: 6 additions & 7 deletions src/servers/src/grpc/flight/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::warn;
use common_telemetry::{warn, TRACE_ID};
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
Expand All @@ -39,12 +39,11 @@ pub struct FlightRecordBatchStream {
}

impl FlightRecordBatchStream {
pub fn new(recordbatches: SendableRecordBatchStream) -> Self {
pub fn new(recordbatches: SendableRecordBatchStream, trace_id: u64) -> Self {
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
let join_handle =
common_runtime::spawn_read(
async move { Self::flight_data_stream(recordbatches, tx).await },
);
let join_handle = common_runtime::spawn_read(TRACE_ID.scope(trace_id, async move {
Self::flight_data_stream(recordbatches, tx).await
}));
Self {
rx,
join_handle,
Expand Down Expand Up @@ -146,7 +145,7 @@ mod test {
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
.unwrap()
.as_stream();
let mut stream = FlightRecordBatchStream::new(recordbatches);
let mut stream = FlightRecordBatchStream::new(recordbatches, 0);

let mut raw_data = Vec::with_capacity(2);
raw_data.push(stream.next().await.unwrap().unwrap());
Expand Down
7 changes: 4 additions & 3 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::logging;
use common_telemetry::{logging, TRACE_ID};
use metrics::{histogram, increment_counter};
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -74,6 +74,7 @@ impl GreptimeRequestHandler {
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);
let trace_id = query_ctx.trace_id();

// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
Expand All @@ -82,7 +83,7 @@ impl GreptimeRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(async move {
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
handler.do_query(query, query_ctx).await.map_err(|e| {
if e.status_code().should_log_error() {
logging::error!(e; "Failed to handle request");
Expand All @@ -92,7 +93,7 @@ impl GreptimeRequestHandler {
}
e
})
});
}));

handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
Expand Down
14 changes: 10 additions & 4 deletions src/servers/src/grpc/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::region::{region_request, RegionRequest, RegionResponse};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, TRACE_ID};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};

Expand All @@ -45,8 +45,14 @@ impl RegionServerRequestHandler {
}

async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
let trace_id = request
.header
.context(InvalidQuerySnafu {
reason: "Expecting non-empty region request header.",
})?
.trace_id;
let query = request.body.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
reason: "Expecting non-empty region request body.",
})?;

let handler = self.handler.clone();
Expand All @@ -58,7 +64,7 @@ impl RegionServerRequestHandler {
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self.runtime.spawn(async move {
let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move {
handler.handle(query).await.map_err(|e| {
if e.status_code().should_log_error() {
error!(e; "Failed to handle request");
Expand All @@ -68,7 +74,7 @@ impl RegionServerRequestHandler {
}
e
})
});
}));

handle.await.context(JoinTaskSnafu)?
}
Expand Down

0 comments on commit 693e8de

Please sign in to comment.