Skip to content

Commit

Permalink
feat: trace_id in query context (#2014)
Browse files Browse the repository at this point in the history
* chore: unify once_cell version

* chore: update cargo lock

* chore: add gen_trace_id

* chore: add trace_id to query_ctx

* chore: add debug log

* Revert "chore: add debug log"

This reverts commit f52ab3b.

* chore: add frontend node id option

* chore: add query ctx to query engine ctx

* chore: set trace_id to logical_plan api

* chore: add trace_id in grpc entrance

* chore: generate trace_id while creating query_ctx

* chore: fix typo

* chore: extract trace_id from grpc header

* chore: extract trace_id from grpc header

* chore: fix clippy

* chore: add `QueryContextBuilder`

* chore: change node_id in fe to string
  • Loading branch information
shuiyisong authored Jul 24, 2023
1 parent 39e74dc commit 632cb26
Show file tree
Hide file tree
Showing 25 changed files with 283 additions and 101 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "356694a72f12ad9e15008d4245a0b4fe48f982ad" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "40.0"
paste = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/client/examples/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn run() {

let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let result = db.logical_plan(logical).await.unwrap();
let result = db.logical_plan(logical, None).await.unwrap();

event!(Level::INFO, "result: {:#?}", result);
}
Expand Down
107 changes: 69 additions & 38 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,20 @@ impl Database {

async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request);
let request = self.to_rpc_request(request, None);
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}

#[inline]
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
fn to_rpc_request(&self, request: Request, trace_id: Option<u64>) -> GreptimeRequest {
GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
trace_id: None,
trace_id,
span_id: None,
}),
request: Some(request),
Expand All @@ -161,17 +161,27 @@ impl Database {

pub async fn sql(&self, sql: &str) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_SQL);
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
}))
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
}),
None,
)
.await
}

pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
pub async fn logical_plan(
&self,
logical_plan: Vec<u8>,
trace_id: Option<u64>,
) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_LOGICAL_PLAN);
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}),
trace_id,
)
.await
}

Expand All @@ -183,69 +193,90 @@ impl Database {
step: &str,
) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_PROMQL_RANGE_QUERY);
self.do_get(Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}))
self.do_get(
Request::Query(QueryRequest {
query: Some(Query::PromRangeQuery(PromRangeQuery {
query: promql.to_string(),
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
})),
}),
None,
)
.await
}

pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_CREATE_TABLE);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}),
None,
)
.await
}

pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_ALTER);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}),
None,
)
.await
}

pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_DROP_TABLE);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}),
None,
)
.await
}

pub async fn flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_FLUSH_TABLE);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(expr)),
}),
None,
)
.await
}

pub async fn compact_table(&self, expr: CompactTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_COMPACT_TABLE);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CompactTable(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CompactTable(expr)),
}),
None,
)
.await
}

pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_TRUNCATE_TABLE);
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}))
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}),
None,
)
.await
}

async fn do_get(&self, request: Request) -> Result<Output> {
async fn do_get(&self, request: Request, trace_id: Option<u64>) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics
let _timer = timer!(metrics::METRIC_GRPC_DO_GET);
let request = self.to_rpc_request(request);
let request = self.to_rpc_request(request, trace_id);
let request = Ticket {
ticket: request.encode_to_vec().into(),
};
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl Repl {
.encode(plan)
.context(SubstraitEncodeLogicalPlanSnafu)?;

self.database.logical_plan(plan.to_vec()).await
self.database.logical_plan(plan.to_vec(), None).await
} else {
self.database.sql(&sql).await
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ datatypes = { path = "../../datatypes" }
libc = "0.2"
num = "0.4"
num-traits = "0.2"
once_cell = "1.10"
once_cell.workspace = true
paste = "1.0"
snafu.workspace = true
statrs = "0.16"
Expand Down
2 changes: 1 addition & 1 deletion src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async-trait.workspace = true
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
metrics.workspace = true
once_cell = "1.12"
once_cell.workspace = true
paste.workspace = true
snafu.workspace = true
tokio.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion src/common/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ console-subscriber = { version = "0.1", optional = true }
metrics-exporter-prometheus = { version = "0.11", default-features = false }
metrics-util = "0.14"
metrics.workspace = true
once_cell = "1.10"
once_cell.workspace = true
opentelemetry = { version = "0.17", default-features = false, features = [
"trace",
"rt-tokio",
Expand All @@ -24,6 +24,8 @@ opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
parking_lot = { version = "0.12", features = [
"deadlock_detection",
], optional = true }
rand.workspace = true
rs-snowflake = "0.6"
serde.workspace = true
tokio.workspace = true
tracing = "0.1"
Expand Down
37 changes: 37 additions & 0 deletions src/common/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,44 @@ mod macros;
pub mod metric;
mod panic_hook;

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

pub use logging::{init_default_ut_logging, init_global_logging};
pub use metric::init_default_metrics_recorder;
use once_cell::sync::OnceCell;
pub use panic_hook::set_panic_hook;
use parking_lot::Mutex;
use rand::random;
use snowflake::SnowflakeIdBucket;
pub use {common_error, tracing, tracing_appender, tracing_futures, tracing_subscriber};

static NODE_ID: OnceCell<u64> = OnceCell::new();
static TRACE_BUCKET: OnceCell<Mutex<SnowflakeIdBucket>> = OnceCell::new();

pub fn gen_trace_id() -> u64 {
let mut bucket = TRACE_BUCKET
.get_or_init(|| {
// if node_id is not initialized, how about random one?
let node_id = NODE_ID.get_or_init(|| 0);
info!("initializing bucket with node_id: {}", node_id);
let bucket = SnowflakeIdBucket::new(1, (*node_id) as i32);
Mutex::new(bucket)
})
.lock();
(*bucket).get_id() as u64
}

pub fn init_node_id(node_id: Option<String>) {
let node_id = node_id.map(|id| calculate_hash(&id)).unwrap_or(random());
match NODE_ID.set(node_id) {
Ok(_) => {}
Err(_) => warn!("node_id is already initialized"),
}
}

fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}
2 changes: 1 addition & 1 deletion src/common/test-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ edition.workspace = true
license.workspace = true

[dependencies]
once_cell = "1.16"
once_cell.workspace = true
rand.workspace = true
tempfile.workspace = true
2 changes: 2 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::service_config::{
#[serde(default)]
pub struct FrontendOptions {
pub mode: Mode,
pub node_id: Option<String>,
pub heartbeat: HeartbeatOptions,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
Expand All @@ -46,6 +47,7 @@ impl Default for FrontendOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
node_id: None,
heartbeat: HeartbeatOptions::default(),
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ impl Instance {
Arc::new(handlers_executor),
));

common_telemetry::init_node_id(opts.node_id.clone());

Ok(Instance {
catalog_manager,
script_executor,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl DatanodeInstance {

let result = self
.db
.logical_plan(substrait_plan.to_vec())
.logical_plan(substrait_plan.to_vec(), None)
.await
.context(error::RequestDatanodeSnafu)?;
let Output::RecordBatches(record_batches) = result else { unreachable!() };
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ h2 = "0.3"
http-body = "0.4"
lazy_static.workspace = true
metrics.workspace = true
once_cell = "1.17"
once_cell.workspace = true
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ greptime-proto.workspace = true
humantime = "2.1"
metrics.workspace = true
object-store = { path = "../object-store" }
once_cell = "1.10"
once_cell.workspace = true
partition = { path = "../partition" }
promql = { path = "../promql" }
promql-parser = "0.1.1"
Expand Down
8 changes: 5 additions & 3 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DatafusionQueryEngine {
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut ctx = QueryEngineContext::new(self.state.session_state());
let mut ctx = QueryEngineContext::new(self.state.session_state(), query_ctx.clone());

// `create_physical_plan` will optimize logical plan internally
let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
Expand Down Expand Up @@ -363,10 +363,12 @@ impl QueryExecutor for DatafusionQueryEngine {
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream> {
let _timer = timer!(metrics::METRIC_EXEC_PLAN_ELAPSED);
let task_ctx = ctx.build_task_ctx();

match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan
.execute(0, ctx.state().task_ctx())
.execute(0, task_ctx)
.context(error::ExecutePhysicalPlanSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu))?,
Expand All @@ -377,7 +379,7 @@ impl QueryExecutor for DatafusionQueryEngine {
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream = plan
.execute(0, ctx.state().task_ctx())
.execute(0, task_ctx)
.context(error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
})
Expand Down
Loading

0 comments on commit 632cb26

Please sign in to comment.