From 632cb2643085d0210504268bf3c34adddd1abc5b Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 24 Jul 2023 15:35:06 +0800 Subject: [PATCH] feat: `trace_id` in query context (#2014) * chore: unify once_cell version * chore: update cargo lock * chore: add gen_trace_id * chore: add trace_id to query_ctx * chore: add debug log * Revert "chore: add debug log" This reverts commit f52ab3bb300f1d73117cd6ebbd8e0162829b1aba. * chore: add frontend node id option * chore: add query ctx to query engine ctx * chore: set trace_id to logical_plan api * chore: add trace_id in grpc entrance * chore: generate trace_id while creating query_ctx * chore: fix typo * chore: extract trace_id from grpc header * chore: extract trace_id from grpc header * chore: fix clippy * chore: add `QueryContextBuilder` * chore: change node_id in fe to string --- Cargo.lock | 8 ++ Cargo.toml | 1 + src/client/examples/logical.rs | 2 +- src/client/src/database.rs | 107 +++++++++++++++++--------- src/cmd/src/cli/repl.rs | 2 +- src/common/function/Cargo.toml | 2 +- src/common/runtime/Cargo.toml | 2 +- src/common/telemetry/Cargo.toml | 4 +- src/common/telemetry/src/lib.rs | 37 +++++++++ src/common/test-util/Cargo.toml | 2 +- src/frontend/src/frontend.rs | 2 + src/frontend/src/instance.rs | 2 + src/frontend/src/table/scan.rs | 2 +- src/meta-srv/Cargo.toml | 2 +- src/query/Cargo.toml | 2 +- src/query/src/datafusion.rs | 8 +- src/query/src/dist_plan/merge_scan.rs | 11 ++- src/query/src/query_engine/context.rs | 28 ++++++- src/script/Cargo.toml | 2 +- src/servers/Cargo.toml | 2 +- src/servers/src/grpc/handler.rs | 50 +++++++----- src/session/src/context.rs | 88 +++++++++++++++++---- src/session/src/lib.rs | 14 ++-- src/sql/Cargo.toml | 2 +- tests-integration/Cargo.toml | 2 +- 25 files changed, 283 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03abb02d633d..c6674f060a53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,8 @@ dependencies = [ "opentelemetry 0.17.0", "opentelemetry-jaeger", "parking_lot 0.12.1", + "rand", + "rs-snowflake", "serde", "tokio", "tracing", @@ -7768,6 +7770,12 @@ dependencies = [ "serde", ] +[[package]] +name = "rs-snowflake" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e60ef3b82994702bbe4e134d98aadca4b49ed04440148985678d415c68127666" + [[package]] name = "rsa" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index ac6e751b982e..dab2da2303fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "356694a72f12ad9e15008d4245a0b4fe48f982ad" } itertools = "0.10" lazy_static = "1.4" +once_cell = "1.18" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "40.0" paste = "1.0" diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index d4b74fa7cf33..911be170ffd3 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -73,7 +73,7 @@ async fn run() { let logical = mock_logical_plan(); event!(Level::INFO, "plan size: {:#?}", logical.len()); - let result = db.logical_plan(logical).await.unwrap(); + let result = db.logical_plan(logical, None).await.unwrap(); event!(Level::INFO, "result: {:#?}", result); } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 97ab29fff665..80452a0cfd6c 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -139,20 +139,20 @@ impl Database { async fn handle(&self, request: Request) -> Result { let mut client = self.client.make_database_client()?.inner; - let request = self.to_rpc_request(request); + let request = self.to_rpc_request(request, None); let response = client.handle(request).await?.into_inner(); from_grpc_response(response) } #[inline] - fn to_rpc_request(&self, request: Request) -> GreptimeRequest { + fn to_rpc_request(&self, request: Request, trace_id: Option) -> GreptimeRequest { GreptimeRequest { header: Some(RequestHeader { catalog: self.catalog.clone(), schema: self.schema.clone(), authorization: self.ctx.auth_header.clone(), dbname: self.dbname.clone(), - trace_id: None, + trace_id, span_id: None, }), request: Some(request), @@ -161,17 +161,27 @@ impl Database { pub async fn sql(&self, sql: &str) -> Result { let _timer = timer!(metrics::METRIC_GRPC_SQL); - self.do_get(Request::Query(QueryRequest { - query: Some(Query::Sql(sql.to_string())), - })) + self.do_get( + Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + }), + None, + ) .await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + pub async fn logical_plan( + &self, + logical_plan: Vec, + trace_id: Option, + ) -> Result { let _timer = timer!(metrics::METRIC_GRPC_LOGICAL_PLAN); - self.do_get(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })) + self.do_get( + Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + }), + trace_id, + ) .await } @@ -183,69 +193,90 @@ impl Database { step: &str, ) -> Result { let _timer = timer!(metrics::METRIC_GRPC_PROMQL_RANGE_QUERY); - self.do_get(Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(PromRangeQuery { - query: promql.to_string(), - start: start.to_string(), - end: end.to_string(), - step: step.to_string(), - })), - })) + self.do_get( + Request::Query(QueryRequest { + query: Some(Query::PromRangeQuery(PromRangeQuery { + query: promql.to_string(), + start: start.to_string(), + end: end.to_string(), + step: step.to_string(), + })), + }), + None, + ) .await } pub async fn create(&self, expr: CreateTableExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_CREATE_TABLE); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + }), + None, + ) .await } pub async fn alter(&self, expr: AlterExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_ALTER); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + }), + None, + ) .await } pub async fn drop_table(&self, expr: DropTableExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_DROP_TABLE); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + }), + None, + ) .await } pub async fn flush_table(&self, expr: FlushTableExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_FLUSH_TABLE); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::FlushTable(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::FlushTable(expr)), + }), + None, + ) .await } pub async fn compact_table(&self, expr: CompactTableExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_COMPACT_TABLE); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CompactTable(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CompactTable(expr)), + }), + None, + ) .await } pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { let _timer = timer!(metrics::METRIC_GRPC_TRUNCATE_TABLE); - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::TruncateTable(expr)), - })) + self.do_get( + Request::Ddl(DdlRequest { + expr: Some(DdlExpr::TruncateTable(expr)), + }), + None, + ) .await } - async fn do_get(&self, request: Request) -> Result { + async fn do_get(&self, request: Request, trace_id: Option) -> Result { // FIXME(paomian): should be added some labels for metrics let _timer = timer!(metrics::METRIC_GRPC_DO_GET); - let request = self.to_rpc_request(request); + let request = self.to_rpc_request(request, trace_id); let request = Ticket { ticket: request.encode_to_vec().into(), }; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 17fb7944e1dd..470cbc8181bc 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -183,7 +183,7 @@ impl Repl { .encode(plan) .context(SubstraitEncodeLogicalPlanSnafu)?; - self.database.logical_plan(plan.to_vec()).await + self.database.logical_plan(plan.to_vec(), None).await } else { self.database.sql(&sql).await } diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index f169ddb01dee..43cbc4818137 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -16,7 +16,7 @@ datatypes = { path = "../../datatypes" } libc = "0.2" num = "0.4" num-traits = "0.2" -once_cell = "1.10" +once_cell.workspace = true paste = "1.0" snafu.workspace = true statrs = "0.16" diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index 9256da77456d..4f81320f77df 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -9,7 +9,7 @@ async-trait.workspace = true common-error = { path = "../error" } common-telemetry = { path = "../telemetry" } metrics.workspace = true -once_cell = "1.12" +once_cell.workspace = true paste.workspace = true snafu.workspace = true tokio.workspace = true diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 2009befc50cf..3581da88a92d 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -15,7 +15,7 @@ console-subscriber = { version = "0.1", optional = true } metrics-exporter-prometheus = { version = "0.11", default-features = false } metrics-util = "0.14" metrics.workspace = true -once_cell = "1.10" +once_cell.workspace = true opentelemetry = { version = "0.17", default-features = false, features = [ "trace", "rt-tokio", @@ -24,6 +24,8 @@ opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } parking_lot = { version = "0.12", features = [ "deadlock_detection", ], optional = true } +rand.workspace = true +rs-snowflake = "0.6" serde.workspace = true tokio.workspace = true tracing = "0.1" diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index e91c77175145..0c760dd96843 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -17,7 +17,44 @@ mod macros; pub mod metric; mod panic_hook; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + pub use logging::{init_default_ut_logging, init_global_logging}; pub use metric::init_default_metrics_recorder; +use once_cell::sync::OnceCell; pub use panic_hook::set_panic_hook; +use parking_lot::Mutex; +use rand::random; +use snowflake::SnowflakeIdBucket; pub use {common_error, tracing, tracing_appender, tracing_futures, tracing_subscriber}; + +static NODE_ID: OnceCell = OnceCell::new(); +static TRACE_BUCKET: OnceCell> = OnceCell::new(); + +pub fn gen_trace_id() -> u64 { + let mut bucket = TRACE_BUCKET + .get_or_init(|| { + // if node_id is not initialized, how about random one? + let node_id = NODE_ID.get_or_init(|| 0); + info!("initializing bucket with node_id: {}", node_id); + let bucket = SnowflakeIdBucket::new(1, (*node_id) as i32); + Mutex::new(bucket) + }) + .lock(); + (*bucket).get_id() as u64 +} + +pub fn init_node_id(node_id: Option) { + let node_id = node_id.map(|id| calculate_hash(&id)).unwrap_or(random()); + match NODE_ID.set(node_id) { + Ok(_) => {} + Err(_) => warn!("node_id is already initialized"), + } +} + +fn calculate_hash(t: &T) -> u64 { + let mut s = DefaultHasher::new(); + t.hash(&mut s); + s.finish() +} diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 774a9ccb1e2b..60e854740643 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -5,6 +5,6 @@ edition.workspace = true license.workspace = true [dependencies] -once_cell = "1.16" +once_cell.workspace = true rand.workspace = true tempfile.workspace = true diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 91a17a01ca13..caebeb665417 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -28,6 +28,7 @@ use crate::service_config::{ #[serde(default)] pub struct FrontendOptions { pub mode: Mode, + pub node_id: Option, pub heartbeat: HeartbeatOptions, pub http_options: Option, pub grpc_options: Option, @@ -46,6 +47,7 @@ impl Default for FrontendOptions { fn default() -> Self { Self { mode: Mode::Standalone, + node_id: None, heartbeat: HeartbeatOptions::default(), http_options: Some(HttpOptions::default()), grpc_options: Some(GrpcOptions::default()), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 6c68a62dfc92..4bd51c51a97b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -206,6 +206,8 @@ impl Instance { Arc::new(handlers_executor), )); + common_telemetry::init_node_id(opts.node_id.clone()); + Ok(Instance { catalog_manager, script_executor, diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 7d8080bf5ff9..ac3141c4b050 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -60,7 +60,7 @@ impl DatanodeInstance { let result = self .db - .logical_plan(substrait_plan.to_vec()) + .logical_plan(substrait_plan.to_vec(), None) .await .context(error::RequestDatanodeSnafu)?; let Output::RecordBatches(record_batches) = result else { unreachable!() }; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 0cd4f5c95bb8..883840e6b761 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -33,7 +33,7 @@ h2 = "0.3" http-body = "0.4" lazy_static.workspace = true metrics.workspace = true -once_cell = "1.17" +once_cell.workspace = true parking_lot = "0.12" prost.workspace = true rand.workspace = true diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index a330f21f0c05..3a4b3eb623a9 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -36,7 +36,7 @@ greptime-proto.workspace = true humantime = "2.1" metrics.workspace = true object-store = { path = "../object-store" } -once_cell = "1.10" +once_cell.workspace = true partition = { path = "../partition" } promql = { path = "../promql" } promql-parser = "0.1.1" diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index b406ef3bffcd..1a249bc8fec9 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -82,7 +82,7 @@ impl DatafusionQueryEngine { plan: LogicalPlan, query_ctx: QueryContextRef, ) -> Result { - let mut ctx = QueryEngineContext::new(self.state.session_state()); + let mut ctx = QueryEngineContext::new(self.state.session_state(), query_ctx.clone()); // `create_physical_plan` will optimize logical plan internally let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?; @@ -363,10 +363,12 @@ impl QueryExecutor for DatafusionQueryEngine { plan: &Arc, ) -> Result { let _timer = timer!(metrics::METRIC_EXEC_PLAN_ELAPSED); + let task_ctx = ctx.build_task_ctx(); + match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), 1 => Ok(plan - .execute(0, ctx.state().task_ctx()) + .execute(0, task_ctx) .context(error::ExecutePhysicalPlanSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu))?, @@ -377,7 +379,7 @@ impl QueryExecutor for DatafusionQueryEngine { // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); let df_stream = plan - .execute(0, ctx.state().task_ctx()) + .execute(0, task_ctx) .context(error::DatafusionSnafu { msg: "Failed to execute DataFusion merge exec", }) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index c55e4deb83da..1954d14a3f96 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -132,18 +132,19 @@ impl MergeScanExec { } } - pub fn to_stream(&self) -> Result { + pub fn to_stream(&self, context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let peers = self.peers.clone(); let clients = self.clients.clone(); let table = self.table.clone(); + let trace_id = context.task_id().and_then(|id| id.parse().ok()); let stream = try_stream! { for peer in peers { let client = clients.get_client(&peer).await; let database = Database::new(&table.catalog_name, &table.schema_name, client); let output: Output = database - .logical_plan(substrait_plan.clone()) + .logical_plan(substrait_plan.clone(), trace_id) .await .context(RemoteRequestSnafu) .map_err(BoxedError::new) @@ -220,9 +221,11 @@ impl ExecutionPlan for MergeScanExec { fn execute( &self, _partition: usize, - _context: Arc, + context: Arc, ) -> Result { - Ok(Box::pin(DfRecordBatchStreamAdapter::new(self.to_stream()?))) + Ok(Box::pin(DfRecordBatchStreamAdapter::new( + self.to_stream(context)?, + ))) } fn statistics(&self) -> Statistics { diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index 0e5750b8df07..d9f3bc5d30d1 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -12,20 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -use datafusion::execution::context::SessionState; +use std::sync::Arc; + +use datafusion::execution::context::{SessionState, TaskContext}; +use session::context::QueryContextRef; #[derive(Debug)] pub struct QueryEngineContext { state: SessionState, + query_ctx: QueryContextRef, } impl QueryEngineContext { - pub fn new(state: SessionState) -> Self { - Self { state } + pub fn new(state: SessionState, query_ctx: QueryContextRef) -> Self { + Self { state, query_ctx } } #[inline] pub fn state(&self) -> &SessionState { &self.state } + + #[inline] + pub fn query_ctx(&self) -> QueryContextRef { + self.query_ctx.clone() + } + + pub fn build_task_ctx(&self) -> Arc { + let task_id = self.query_ctx.trace_id().to_string(); + let state = &self.state; + Arc::new(TaskContext::new( + Some(task_id), + state.session_id().to_string(), + state.config().clone(), + state.scalar_functions().clone(), + state.aggregate_functions().clone(), + state.runtime_env().clone(), + )) + } } diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 2cd7954e719a..b870378dce0c 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -44,7 +44,7 @@ datafusion-physical-expr = { workspace = true, optional = true } datatypes = { path = "../datatypes" } futures.workspace = true futures-util.workspace = true -once_cell = "1.17.0" +once_cell.workspace = true paste = { workspace = true, optional = true } query = { path = "../query" } # TODO(discord9): This is a forked and tweaked version of RustPython, please update it to newest original RustPython After RustPython support GC diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 7c6d6d848015..a4f41eba5f39 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -53,7 +53,7 @@ metrics.workspace = true metrics-process = { version = "<1.0.10", optional = true } mime_guess = "2.0" num_cpus = "1.13" -once_cell = "1.16" +once_cell.workspace = true openmetrics-parser = "0.4" opensrv-mysql = "0.4" opentelemetry-proto.workspace = true diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 1d4e80c1571c..c4baa911c7f2 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -18,13 +18,14 @@ use std::time::Instant; use api::helper::request_type; use api::v1::auth_header::AuthScheme; use api::v1::{Basic, GreptimeRequest, RequestHeader}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; use common_telemetry::logging; use metrics::{histogram, increment_counter}; -use session::context::{QueryContext, QueryContextRef}; +use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use crate::auth::{Identity, Password, UserProviderRef}; @@ -70,7 +71,6 @@ impl GreptimeRequestHandler { let query_ctx = create_query_context(header); let _ = self.auth(header, &query_ctx).await?; - let handler = self.handler.clone(); let request_type = request_type(&query); let db = query_ctx.get_db_string(); @@ -145,25 +145,35 @@ impl GreptimeRequestHandler { } pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef { - let ctx = QueryContext::arc(); - if let Some(header) = header { - // We provide dbname field in newer versions of protos/sdks - // parse dbname from header in priority - if !header.dbname.is_empty() { - let (catalog, schema) = - crate::parse_catalog_and_schema_from_client_database_name(&header.dbname); - ctx.set_current_catalog(catalog); - ctx.set_current_schema(schema); - } else { - if !header.catalog.is_empty() { - ctx.set_current_catalog(&header.catalog); - } - if !header.schema.is_empty() { - ctx.set_current_schema(&header.schema); + let (catalog, schema) = header + .map(|header| { + // We provide dbname field in newer versions of protos/sdks + // parse dbname from header in priority + if !header.dbname.is_empty() { + crate::parse_catalog_and_schema_from_client_database_name(&header.dbname) + } else { + ( + if !header.catalog.is_empty() { + &header.catalog + } else { + DEFAULT_CATALOG_NAME + }, + if !header.schema.is_empty() { + &header.schema + } else { + DEFAULT_SCHEMA_NAME + }, + ) } - } - }; - ctx + }) + .unwrap_or((DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)); + + QueryContextBuilder::new() + .catalog(catalog.to_string()) + .schema(schema.to_string()) + .try_trace_id(header.and_then(|h: &RequestHeader| h.trace_id)) + .build() + .to_arc() } /// Histogram timer for handling gRPC request. diff --git a/src/session/src/context.rs b/src/session/src/context.rs index ade230f889be..909db64d4b37 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -32,6 +32,7 @@ pub struct QueryContext { current_schema: ArcSwap, time_zone: ArcSwap>, sql_dialect: Box, + trace_id: u64, } impl Default for QueryContext { @@ -56,30 +57,25 @@ impl QueryContext { Arc::new(QueryContext::new()) } + pub fn to_arc(self) -> QueryContextRef { + Arc::new(self) + } + pub fn new() -> Self { Self { current_catalog: ArcSwap::new(Arc::new(DEFAULT_CATALOG_NAME.to_string())), current_schema: ArcSwap::new(Arc::new(DEFAULT_SCHEMA_NAME.to_string())), time_zone: ArcSwap::new(Arc::new(None)), sql_dialect: Box::new(GreptimeDbDialect {}), + trace_id: common_telemetry::gen_trace_id(), } } pub fn with(catalog: &str, schema: &str) -> Self { - Self::with_sql_dialect(catalog, schema, Box::new(GreptimeDbDialect {})) - } - - pub fn with_sql_dialect( - catalog: &str, - schema: &str, - sql_dialect: Box, - ) -> Self { - Self { - current_catalog: ArcSwap::new(Arc::new(catalog.to_string())), - current_schema: ArcSwap::new(Arc::new(schema.to_string())), - time_zone: ArcSwap::new(Arc::new(None)), - sql_dialect, - } + QueryContextBuilder::new() + .catalog(catalog.to_string()) + .schema(schema.to_string()) + .build() } #[inline] @@ -132,6 +128,70 @@ impl QueryContext { pub fn set_time_zone(&self, tz: Option) { let _ = self.time_zone.swap(Arc::new(tz)); } + + #[inline] + pub fn trace_id(&self) -> u64 { + self.trace_id + } +} + +#[derive(Default)] +pub struct QueryContextBuilder { + catalog: Option, + schema: Option, + time_zone: Option, + sql_dialect: Option>, + trace_id: Option, +} + +impl QueryContextBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn catalog(mut self, catalog: String) -> Self { + self.catalog = Some(catalog); + self + } + + pub fn schema(mut self, schema: String) -> Self { + self.schema = Some(schema); + self + } + + pub fn time_zone(mut self, tz: TimeZone) -> Self { + self.time_zone = Some(tz); + self + } + + pub fn sql_dialect(mut self, sql_dialect: Box) -> Self { + self.sql_dialect = Some(sql_dialect); + self + } + + pub fn trace_id(mut self, trace_id: u64) -> Self { + self.trace_id = Some(trace_id); + self + } + + pub fn try_trace_id(mut self, trace_id: Option) -> Self { + self.trace_id = trace_id; + self + } + + pub fn build(self) -> QueryContext { + QueryContext { + current_catalog: ArcSwap::new(Arc::new( + self.catalog.unwrap_or(DEFAULT_CATALOG_NAME.to_string()), + )), + current_schema: ArcSwap::new(Arc::new( + self.schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), + )), + time_zone: ArcSwap::new(Arc::new(self.time_zone)), + sql_dialect: self.sql_dialect.unwrap_or(Box::new(GreptimeDbDialect {})), + trace_id: self.trace_id.unwrap_or(common_telemetry::gen_trace_id()), + } + } } pub const DEFAULT_USERNAME: &str = "greptime"; diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index 34ea190e4d05..11591980bdbc 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use arc_swap::ArcSwap; use common_catalog::build_db_string; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use context::QueryContextBuilder; -use crate::context::{Channel, ConnInfo, QueryContext, QueryContextRef, UserInfo}; +use crate::context::{Channel, ConnInfo, QueryContextRef, UserInfo}; /// Session for persistent connection such as MySQL, PostgreSQL etc. #[derive(Debug)] @@ -46,11 +47,12 @@ impl Session { #[inline] pub fn new_query_context(&self) -> QueryContextRef { - Arc::new(QueryContext::with_sql_dialect( - self.catalog.load().as_ref(), - self.schema.load().as_ref(), - self.conn_info.channel.dialect(), - )) + QueryContextBuilder::new() + .catalog(self.catalog.load().to_string()) + .schema(self.schema.load().to_string()) + .sql_dialect(self.conn_info.channel.dialect()) + .build() + .to_arc() } #[inline] diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 078e2bba0e32..594409d295ef 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -17,6 +17,6 @@ datatypes = { path = "../datatypes" } hex = "0.4" itertools.workspace = true mito = { path = "../mito" } -once_cell = "1.10" +once_cell.workspace = true snafu = { version = "0.7", features = ["backtraces"] } sqlparser.workspace = true diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 9eb044a09432..acf06e38cc77 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -34,7 +34,7 @@ meta-client = { path = "../src/meta-client" } meta-srv = { path = "../src/meta-srv" } mito = { path = "../src/mito", features = ["test"] } object-store = { path = "../src/object-store" } -once_cell = "1.16" +once_cell.workspace = true query = { path = "../src/query" } rand.workspace = true rstest = "0.17"